Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36    ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
37    ValidMutCommutativityFor, ValidMutIdempotenceFor,
38};
39
40pub mod networking;
41
42/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
43#[sealed::sealed]
44pub trait Ordering:
45    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
46{
47    /// The [`StreamOrder`] corresponding to this type.
48    const ORDERING_KIND: StreamOrder;
49}
50
51/// Marks the stream as being totally ordered, which means that there are
52/// no sources of non-determinism (other than intentional ones) that will
53/// affect the order of elements.
54pub enum TotalOrder {}
55
56#[sealed::sealed]
57impl Ordering for TotalOrder {
58    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
59}
60
61/// Marks the stream as having no order, which means that the order of
62/// elements may be affected by non-determinism.
63///
64/// This restricts certain operators, such as `fold` and `reduce`, to only
65/// be used with commutative aggregation functions.
66pub enum NoOrder {}
67
68#[sealed::sealed]
69impl Ordering for NoOrder {
70    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
71}
72
73/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
74/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
75/// have `Self` guarantees instead.
76#[sealed::sealed]
77pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
78#[sealed::sealed]
79impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
80
81/// Helper trait for determining the weakest of two orderings.
82#[sealed::sealed]
83pub trait MinOrder<Other: ?Sized> {
84    /// The weaker of the two orderings.
85    type Min: Ordering;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for TotalOrder {
90    type Min = O;
91}
92
93#[sealed::sealed]
94impl<O: Ordering> MinOrder<O> for NoOrder {
95    type Min = NoOrder;
96}
97
98/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
99#[sealed::sealed]
100pub trait Retries:
101    MinRetries<Self, Min = Self>
102    + MinRetries<ExactlyOnce, Min = Self>
103    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
104{
105    /// The [`StreamRetry`] corresponding to this type.
106    const RETRIES_KIND: StreamRetry;
107}
108
109/// Marks the stream as having deterministic message cardinality, with no
110/// possibility of duplicates.
111pub enum ExactlyOnce {}
112
113#[sealed::sealed]
114impl Retries for ExactlyOnce {
115    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
116}
117
118/// Marks the stream as having non-deterministic message cardinality, which
119/// means that duplicates may occur, but messages will not be dropped.
120pub enum AtLeastOnce {}
121
122#[sealed::sealed]
123impl Retries for AtLeastOnce {
124    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
125}
126
127/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
128/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
129/// have `Self` guarantees instead.
130#[sealed::sealed]
131pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
132#[sealed::sealed]
133impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
134
135/// Helper trait for determining the weakest of two retry guarantees.
136#[sealed::sealed]
137pub trait MinRetries<Other: ?Sized> {
138    /// The weaker of the two retry guarantees.
139    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for ExactlyOnce {
144    type Min = R;
145}
146
147#[sealed::sealed]
148impl<R: Retries> MinRetries<R> for AtLeastOnce {
149    type Min = AtLeastOnce;
150}
151
152#[sealed::sealed]
153#[diagnostic::on_unimplemented(
154    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
155    label = "required here",
156    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
157)]
158/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
159pub trait IsOrdered: Ordering {}
160
161#[sealed::sealed]
162#[diagnostic::do_not_recommend]
163impl IsOrdered for TotalOrder {}
164
165#[sealed::sealed]
166#[diagnostic::on_unimplemented(
167    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
168    label = "required here",
169    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
170)]
171/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
172pub trait IsExactlyOnce: Retries {}
173
174#[sealed::sealed]
175#[diagnostic::do_not_recommend]
176impl IsExactlyOnce for ExactlyOnce {}
177
178/// Streaming sequence of elements with type `Type`.
179///
180/// This live collection represents a growing sequence of elements, with new elements being
181/// asynchronously appended to the end of the sequence. This can be used to model the arrival
182/// of network input, such as API requests, or streaming ingestion.
183///
184/// By default, all streams have deterministic ordering and each element is materialized exactly
185/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
186/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
187/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
188///
189/// Type Parameters:
190/// - `Type`: the type of elements in the stream
191/// - `Loc`: the location where the stream is being materialized
192/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
193/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
194///   (default is [`TotalOrder`])
195/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
196///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
197pub struct Stream<
198    Type,
199    Loc,
200    Bound: Boundedness = Unbounded,
201    Order: Ordering = TotalOrder,
202    Retry: Retries = ExactlyOnce,
203> {
204    pub(crate) location: Loc,
205    pub(crate) ir_node: RefCell<HydroNode>,
206    pub(crate) flow_state: FlowState,
207
208    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
209}
210
211impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
212    fn drop(&mut self) {
213        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
214        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
215            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
216                input: Box::new(ir_node),
217                op_metadata: HydroIrOpMetadata::new(),
218            });
219        }
220    }
221}
222
223impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
224    for Stream<T, L, Unbounded, O, R>
225where
226    L: Location<'a>,
227{
228    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
229        let new_meta = stream
230            .location
231            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
232
233        Stream {
234            location: stream.location.clone(),
235            flow_state: stream.flow_state.clone(),
236            ir_node: RefCell::new(HydroNode::Cast {
237                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
238                metadata: new_meta,
239            }),
240            _phantom: PhantomData,
241        }
242    }
243}
244
245impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
246    for Stream<T, L, B, NoOrder, R>
247where
248    L: Location<'a>,
249{
250    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
251        stream.weaken_ordering()
252    }
253}
254
255impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
256    for Stream<T, L, B, O, AtLeastOnce>
257where
258    L: Location<'a>,
259{
260    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
261        stream.weaken_retries()
262    }
263}
264
265impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
266where
267    L: Location<'a>,
268{
269    fn defer_tick(self) -> Self {
270        Stream::defer_tick(self)
271    }
272}
273
274impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
275    for Stream<T, Tick<L>, Bounded, O, R>
276where
277    L: Location<'a>,
278{
279    type Location = Tick<L>;
280
281    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
282        Stream::new(
283            location.clone(),
284            HydroNode::CycleSource {
285                cycle_id,
286                metadata: location.new_node_metadata(Self::collection_kind()),
287            },
288        )
289    }
290}
291
292impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
293    for Stream<T, Tick<L>, Bounded, O, R>
294where
295    L: Location<'a>,
296{
297    type Location = Tick<L>;
298
299    fn location(&self) -> &Self::Location {
300        self.location()
301    }
302
303    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
304        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
305            location.clone(),
306            HydroNode::DeferTick {
307                input: Box::new(HydroNode::CycleSource {
308                    cycle_id,
309                    metadata: location.new_node_metadata(Self::collection_kind()),
310                }),
311                metadata: location.new_node_metadata(Self::collection_kind()),
312            },
313        );
314
315        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
316    }
317}
318
319impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
320    for Stream<T, Tick<L>, Bounded, O, R>
321where
322    L: Location<'a>,
323{
324    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
325        assert_eq!(
326            Location::id(&self.location),
327            expected_location,
328            "locations do not match"
329        );
330        self.location
331            .flow_state()
332            .borrow_mut()
333            .push_root(HydroRoot::CycleSink {
334                cycle_id,
335                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
336                op_metadata: HydroIrOpMetadata::new(),
337            });
338    }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
342    for Stream<T, L, B, O, R>
343where
344    L: Location<'a>,
345{
346    type Location = L;
347
348    fn create_source(cycle_id: CycleId, location: L) -> Self {
349        Stream::new(
350            location.clone(),
351            HydroNode::CycleSource {
352                cycle_id,
353                metadata: location.new_node_metadata(Self::collection_kind()),
354            },
355        )
356    }
357}
358
359impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
360    for Stream<T, L, B, O, R>
361where
362    L: Location<'a>,
363{
364    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
365        assert_eq!(
366            Location::id(&self.location),
367            expected_location,
368            "locations do not match"
369        );
370        self.location
371            .flow_state()
372            .borrow_mut()
373            .push_root(HydroRoot::CycleSink {
374                cycle_id,
375                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376                op_metadata: HydroIrOpMetadata::new(),
377            });
378    }
379}
380
381impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
382where
383    T: Clone,
384    L: Location<'a>,
385{
386    fn clone(&self) -> Self {
387        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
388            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
389            *self.ir_node.borrow_mut() = HydroNode::Tee {
390                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
391                metadata: self.location.new_node_metadata(Self::collection_kind()),
392            };
393        }
394
395        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
396            unreachable!()
397        };
398        Stream {
399            location: self.location.clone(),
400            flow_state: self.flow_state.clone(),
401            ir_node: HydroNode::Tee {
402                inner: SharedNode(inner.0.clone()),
403                metadata: metadata.clone(),
404            }
405            .into(),
406            _phantom: PhantomData,
407        }
408    }
409}
410
411impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
412where
413    L: Location<'a>,
414{
415    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
416        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
417        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
418
419        let flow_state = location.flow_state().clone();
420        Stream {
421            location,
422            flow_state,
423            ir_node: RefCell::new(ir_node),
424            _phantom: PhantomData,
425        }
426    }
427
428    /// Returns the [`Location`] where this stream is being materialized.
429    pub fn location(&self) -> &L {
430        &self.location
431    }
432
433    /// Creates a shared reference handle to this stream's handoff buffer that can be captured
434    /// inside `q!()` closures. The handle resolves to `&Vec<T>` at runtime.
435    ///
436    /// The stream must be bounded, otherwise reading it would be non-deterministic.
437    pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
438    where
439        B: IsBounded,
440    {
441        crate::handoff_ref::StreamRef::new(&self.ir_node)
442    }
443
444    /// Returns a mutable reference handle to this stream's handoff buffer that can be captured
445    /// inside `q!()` closures. The handle resolves to `&mut Vec<T>` at runtime.
446    pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
447    where
448        B: IsBounded,
449    {
450        crate::handoff_ref::StreamMut::new(&self.ir_node)
451    }
452
453    /// Weakens the consistency of this live collection to not guarantee any consistency across
454    /// cluster members (if this collection is on a cluster).
455    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
456    where
457        L: Location<'a>,
458    {
459        if L::consistency()
460            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
461        {
462            // already no consistency
463            Stream::new(
464                self.location.drop_consistency(),
465                self.ir_node.replace(HydroNode::Placeholder),
466            )
467        } else {
468            Stream::new(
469                self.location.drop_consistency(),
470                HydroNode::Cast {
471                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
472                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
473                        T,
474                        L::DropConsistency,
475                        B,
476                        O,
477                        R,
478                    >::collection_kind(
479                    )),
480                },
481            )
482        }
483    }
484
485    /// Casts this live collection to have the consistency guarantees specified in the given
486    /// location type parameter. The developer must ensure that the strengthened consistency
487    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
488    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
489        self,
490        _proof: impl crate::properties::ConsistencyProof,
491    ) -> Stream<T, L2, B, O, R>
492    where
493        L: Location<'a>,
494    {
495        if L::consistency() == L2::consistency() {
496            Stream::new(
497                self.location.with_consistency_of(),
498                self.ir_node.replace(HydroNode::Placeholder),
499            )
500        } else {
501            Stream::new(
502                self.location.with_consistency_of(),
503                HydroNode::AssertIsConsistent {
504                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                    trusted: false,
506                    metadata: self
507                        .location
508                        .clone()
509                        .with_consistency_of::<L2>()
510                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
511                },
512            )
513        }
514    }
515
516    pub(crate) fn assert_has_consistency_of_trusted<
517        L2: Location<'a, DropConsistency = L::DropConsistency>,
518    >(
519        self,
520        _proof: impl crate::properties::ConsistencyProof,
521    ) -> Stream<T, L2, B, O, R>
522    where
523        L: Location<'a>,
524    {
525        if L::consistency() == L2::consistency() {
526            Stream::new(
527                self.location.with_consistency_of(),
528                self.ir_node.replace(HydroNode::Placeholder),
529            )
530        } else {
531            Stream::new(
532                self.location.with_consistency_of(),
533                HydroNode::AssertIsConsistent {
534                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
535                    trusted: true,
536                    metadata: self
537                        .location
538                        .clone()
539                        .with_consistency_of::<L2>()
540                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
541                },
542            )
543        }
544    }
545
546    pub(crate) fn collection_kind() -> CollectionKind {
547        CollectionKind::Stream {
548            bound: B::BOUND_KIND,
549            order: O::ORDERING_KIND,
550            retry: R::RETRIES_KIND,
551            element_type: quote_type::<T>().into(),
552        }
553    }
554
555    /// Produces a stream based on invoking `f` on each element.
556    /// If you do not want to modify the stream and instead only want to view
557    /// each item use [`Stream::inspect`] instead.
558    ///
559    /// # Example
560    /// ```rust
561    /// # #[cfg(feature = "deploy")] {
562    /// # use hydro_lang::prelude::*;
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565    /// let words = process.source_iter(q!(vec!["hello", "world"]));
566    /// words.map(q!(|x| x.to_uppercase()))
567    /// # }, |mut stream| async move {
568    /// # for w in vec!["HELLO", "WORLD"] {
569    /// #     assert_eq!(stream.next().await.unwrap(), w);
570    /// # }
571    /// # }));
572    /// # }
573    /// ```
574    pub fn map<U, F, C, I, const WAS_MUT: bool>(
575        self,
576        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
577    ) -> Stream<U, L, B, O, R>
578    where
579        F: FnMut(T) -> U + 'a,
580        C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
581        I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
582    {
583        let f = crate::handoff_ref::with_ref_capture(|| {
584            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
585            proof.register_proof(&expr);
586            expr.into()
587        });
588        Stream::new(
589            self.location.clone(),
590            HydroNode::Map {
591                f,
592                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593                metadata: self
594                    .location
595                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
596            },
597        )
598    }
599
600    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
601    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
602    /// for the output type `U` must produce items in a **deterministic** order.
603    ///
604    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
605    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
606    ///
607    /// # Example
608    /// ```rust
609    /// # #[cfg(feature = "deploy")] {
610    /// # use hydro_lang::prelude::*;
611    /// # use futures::StreamExt;
612    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613    /// process
614    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
615    ///     .flat_map_ordered(q!(|x| x))
616    /// # }, |mut stream| async move {
617    /// // 1, 2, 3, 4
618    /// # for w in (1..5) {
619    /// #     assert_eq!(stream.next().await.unwrap(), w);
620    /// # }
621    /// # }));
622    /// # }
623    /// ```
624    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
625        self,
626        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
627    ) -> Stream<U, L, B, O, R>
628    where
629        I: IntoIterator<Item = U>,
630        F: FnMut(T) -> I + 'a,
631        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
632        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
633    {
634        let f = crate::handoff_ref::with_ref_capture(|| {
635            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
636            proof.register_proof(&expr);
637            expr.into()
638        });
639        Stream::new(
640            self.location.clone(),
641            HydroNode::FlatMap {
642                f,
643                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
644                metadata: self
645                    .location
646                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
647            },
648        )
649    }
650
651    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
652    /// for the output type `U` to produce items in any order.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
660    /// process
661    ///     .source_iter(q!(vec![
662    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
663    ///         std::collections::HashSet::from_iter(vec![3, 4]),
664    ///     ]))
665    ///     .flat_map_unordered(q!(|x| x))
666    /// # }, |mut stream| async move {
667    /// // 1, 2, 3, 4, but in no particular order
668    /// # let mut results = Vec::new();
669    /// # for w in (1..5) {
670    /// #     results.push(stream.next().await.unwrap());
671    /// # }
672    /// # results.sort();
673    /// # assert_eq!(results, vec![1, 2, 3, 4]);
674    /// # }));
675    /// # }
676    /// ```
677    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
678        self,
679        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
680    ) -> Stream<U, L, B, NoOrder, R>
681    where
682        I: IntoIterator<Item = U>,
683        F: FnMut(T) -> I + 'a,
684        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
685        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
686    {
687        let f = crate::handoff_ref::with_ref_capture(|| {
688            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
689            proof.register_proof(&expr);
690            expr.into()
691        });
692        Stream::new(
693            self.location.clone(),
694            HydroNode::FlatMap {
695                f,
696                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697                metadata: self
698                    .location
699                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
700            },
701        )
702    }
703
704    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
705    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
706    ///
707    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
708    /// not deterministic, use [`Stream::flatten_unordered`] instead.
709    ///
710    /// ```rust
711    /// # #[cfg(feature = "deploy")] {
712    /// # use hydro_lang::prelude::*;
713    /// # use futures::StreamExt;
714    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
715    /// process
716    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
717    ///     .flatten_ordered()
718    /// # }, |mut stream| async move {
719    /// // 1, 2, 3, 4
720    /// # for w in (1..5) {
721    /// #     assert_eq!(stream.next().await.unwrap(), w);
722    /// # }
723    /// # }));
724    /// # }
725    /// ```
726    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
727    where
728        T: IntoIterator<Item = U>,
729    {
730        self.flat_map_ordered(q!(|d| d))
731    }
732
733    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
734    /// for the element type `T` to produce items in any order.
735    ///
736    /// # Example
737    /// ```rust
738    /// # #[cfg(feature = "deploy")] {
739    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
740    /// # use futures::StreamExt;
741    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
742    /// process
743    ///     .source_iter(q!(vec![
744    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
745    ///         std::collections::HashSet::from_iter(vec![3, 4]),
746    ///     ]))
747    ///     .flatten_unordered()
748    /// # }, |mut stream| async move {
749    /// // 1, 2, 3, 4, but in no particular order
750    /// # let mut results = Vec::new();
751    /// # for w in (1..5) {
752    /// #     results.push(stream.next().await.unwrap());
753    /// # }
754    /// # results.sort();
755    /// # assert_eq!(results, vec![1, 2, 3, 4]);
756    /// # }));
757    /// # }
758    /// ```
759    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
760    where
761        T: IntoIterator<Item = U>,
762    {
763        self.flat_map_unordered(q!(|d| d))
764    }
765
766    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
767    /// then emit the elements of that stream one by one. When the inner stream yields
768    /// `Pending`, this operator yields as well.
769    pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
770        self,
771        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
772    ) -> Stream<U, L, B, O, R>
773    where
774        S: futures::Stream<Item = U>,
775        F: FnMut(T) -> S + 'a,
776        C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
777        Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
778    {
779        let f = crate::handoff_ref::with_ref_capture(|| {
780            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
781            proof.register_proof(&expr);
782            expr.into()
783        });
784        Stream::new(
785            self.location.clone(),
786            HydroNode::FlatMapStreamBlocking {
787                f,
788                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
789                metadata: self
790                    .location
791                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
792            },
793        )
794    }
795
796    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
797    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
798    /// yields as well.
799    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
800    where
801        T: futures::Stream<Item = U>,
802    {
803        self.flat_map_stream_blocking(q!(|d| d))
804    }
805
806    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
807    /// `f`, preserving the order of the elements.
808    ///
809    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
810    /// not modify or take ownership of the values. If you need to modify the values while filtering
811    /// use [`Stream::filter_map`] instead.
812    ///
813    /// # Example
814    /// ```rust
815    /// # #[cfg(feature = "deploy")] {
816    /// # use hydro_lang::prelude::*;
817    /// # use futures::StreamExt;
818    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
819    /// process
820    ///     .source_iter(q!(vec![1, 2, 3, 4]))
821    ///     .filter(q!(|&x| x > 2))
822    /// # }, |mut stream| async move {
823    /// // 3, 4
824    /// # for w in (3..5) {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
831        self,
832        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
833    ) -> Self
834    where
835        F: FnMut(&T) -> bool + 'a,
836        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
837        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
838    {
839        let f = crate::handoff_ref::with_ref_capture(|| {
840            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
841            proof.register_proof(&expr);
842            expr.into()
843        });
844        Stream::new(
845            self.location.clone(),
846            HydroNode::Filter {
847                f,
848                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
849                metadata: self.location.new_node_metadata(Self::collection_kind()),
850            },
851        )
852    }
853
854    /// Splits the stream into two streams based on a predicate, without cloning elements.
855    ///
856    /// Elements for which `f` returns `true` are sent to the first output stream,
857    /// and elements for which `f` returns `false` are sent to the second output stream.
858    ///
859    /// Unlike using `filter` twice, this only evaluates the predicate once per element
860    /// and does not require `T: Clone`.
861    ///
862    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
863    /// the predicate is only used for routing; the element itself is moved to the
864    /// appropriate output stream.
865    ///
866    /// # Example
867    /// ```rust
868    /// # #[cfg(feature = "deploy")] {
869    /// # use hydro_lang::prelude::*;
870    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
871    /// # use futures::StreamExt;
872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
873    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
874    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
875    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
876    /// evens.map(q!(|x| (x, true)))
877    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
878    /// # }, |mut stream| async move {
879    /// # let mut results = Vec::new();
880    /// # for _ in 0..6 {
881    /// #     results.push(stream.next().await.unwrap());
882    /// # }
883    /// # results.sort();
884    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
885    /// # }));
886    /// # }
887    /// ```
888    pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
889        self,
890        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
891    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
892    where
893        F: FnMut(&T) -> bool + 'a,
894        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
895        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
896    {
897        let f = crate::handoff_ref::with_ref_capture(|| {
898            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
899            proof.register_proof(&expr);
900            expr.into()
901        });
902        let shared = SharedNode(Rc::new(RefCell::new(
903            self.ir_node.replace(HydroNode::Placeholder),
904        )));
905
906        let true_stream = Stream::new(
907            self.location.clone(),
908            HydroNode::Partition {
909                inner: SharedNode(shared.0.clone()),
910                f: f.clone(),
911                is_true: true,
912                metadata: self.location.new_node_metadata(Self::collection_kind()),
913            },
914        );
915
916        let false_stream = Stream::new(
917            self.location.clone(),
918            HydroNode::Partition {
919                inner: SharedNode(shared.0),
920                f,
921                is_true: false,
922                metadata: self.location.new_node_metadata(Self::collection_kind()),
923            },
924        );
925
926        (true_stream, false_stream)
927    }
928
929    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
930    ///
931    /// # Example
932    /// ```rust
933    /// # #[cfg(feature = "deploy")] {
934    /// # use hydro_lang::prelude::*;
935    /// # use futures::StreamExt;
936    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
937    /// process
938    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
939    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
940    /// # }, |mut stream| async move {
941    /// // 1, 2
942    /// # for w in (1..3) {
943    /// #     assert_eq!(stream.next().await.unwrap(), w);
944    /// # }
945    /// # }));
946    /// # }
947    /// ```
948    pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
949        self,
950        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
951    ) -> Stream<U, L, B, O, R>
952    where
953        F: FnMut(T) -> Option<U> + 'a,
954        C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
955        Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
956    {
957        let f = crate::handoff_ref::with_ref_capture(|| {
958            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
959            proof.register_proof(&expr);
960            expr.into()
961        });
962        Stream::new(
963            self.location.clone(),
964            HydroNode::FilterMap {
965                f,
966                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
967                metadata: self
968                    .location
969                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
970            },
971        )
972    }
973
974    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
975    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
976    /// If `other` is an empty [`Optional`], no values will be produced.
977    ///
978    /// # Example
979    /// ```rust
980    /// # #[cfg(feature = "deploy")] {
981    /// # use hydro_lang::prelude::*;
982    /// # use futures::StreamExt;
983    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
984    /// let tick = process.tick();
985    /// let batch = process
986    ///   .source_iter(q!(vec![1, 2, 3, 4]))
987    ///   .batch(&tick, nondet!(/** test */));
988    /// let count = batch.clone().count(); // `count()` returns a singleton
989    /// batch.cross_singleton(count).all_ticks()
990    /// # }, |mut stream| async move {
991    /// // (1, 4), (2, 4), (3, 4), (4, 4)
992    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
993    /// #     assert_eq!(stream.next().await.unwrap(), w);
994    /// # }
995    /// # }));
996    /// # }
997    /// ```
998    pub fn cross_singleton<O2>(
999        self,
1000        other: impl Into<Optional<O2, L, Bounded>>,
1001    ) -> Stream<(T, O2), L, B, O, R>
1002    where
1003        O2: Clone,
1004    {
1005        let other: Optional<O2, L, Bounded> = other.into();
1006        check_matching_location(&self.location, &other.location);
1007
1008        Stream::new(
1009            self.location.clone(),
1010            HydroNode::CrossSingleton {
1011                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1012                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1013                metadata: self
1014                    .location
1015                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1016            },
1017        )
1018    }
1019
1020    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
1021    ///
1022    /// # Example
1023    /// ```rust
1024    /// # #[cfg(feature = "deploy")] {
1025    /// # use hydro_lang::prelude::*;
1026    /// # use futures::StreamExt;
1027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028    /// let tick = process.tick();
1029    /// // ticks are lazy by default, forces the second tick to run
1030    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1031    ///
1032    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
1033    /// let batch_first_tick = process
1034    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1035    ///   .batch(&tick, nondet!(/** test */));
1036    /// let batch_second_tick = process
1037    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1038    ///   .batch(&tick, nondet!(/** test */))
1039    ///   .defer_tick();
1040    /// batch_first_tick.chain(batch_second_tick)
1041    ///   .filter_if(signal)
1042    ///   .all_ticks()
1043    /// # }, |mut stream| async move {
1044    /// // [1, 2, 3, 4]
1045    /// # for w in vec![1, 2, 3, 4] {
1046    /// #     assert_eq!(stream.next().await.unwrap(), w);
1047    /// # }
1048    /// # }));
1049    /// # }
1050    /// ```
1051    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1052        self.cross_singleton(signal.filter(q!(|b| *b)))
1053            .map(q!(|(d, _)| d))
1054    }
1055
1056    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1057    ///
1058    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1059    /// leader of a cluster.
1060    ///
1061    /// # Example
1062    /// ```rust
1063    /// # #[cfg(feature = "deploy")] {
1064    /// # use hydro_lang::prelude::*;
1065    /// # use futures::StreamExt;
1066    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1067    /// let tick = process.tick();
1068    /// // ticks are lazy by default, forces the second tick to run
1069    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1070    ///
1071    /// let batch_first_tick = process
1072    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1073    ///   .batch(&tick, nondet!(/** test */));
1074    /// let batch_second_tick = process
1075    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1076    ///   .batch(&tick, nondet!(/** test */))
1077    ///   .defer_tick(); // appears on the second tick
1078    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1079    /// batch_first_tick.chain(batch_second_tick)
1080    ///   .filter_if_some(some_on_first_tick)
1081    ///   .all_ticks()
1082    /// # }, |mut stream| async move {
1083    /// // [1, 2, 3, 4]
1084    /// # for w in vec![1, 2, 3, 4] {
1085    /// #     assert_eq!(stream.next().await.unwrap(), w);
1086    /// # }
1087    /// # }));
1088    /// # }
1089    /// ```
1090    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1091    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1092        self.filter_if(signal.is_some())
1093    }
1094
1095    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1096    ///
1097    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1098    /// some local state.
1099    ///
1100    /// # Example
1101    /// ```rust
1102    /// # #[cfg(feature = "deploy")] {
1103    /// # use hydro_lang::prelude::*;
1104    /// # use futures::StreamExt;
1105    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1106    /// let tick = process.tick();
1107    /// // ticks are lazy by default, forces the second tick to run
1108    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1109    ///
1110    /// let batch_first_tick = process
1111    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1112    ///   .batch(&tick, nondet!(/** test */));
1113    /// let batch_second_tick = process
1114    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1115    ///   .batch(&tick, nondet!(/** test */))
1116    ///   .defer_tick(); // appears on the second tick
1117    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1118    /// batch_first_tick.chain(batch_second_tick)
1119    ///   .filter_if_none(some_on_first_tick)
1120    ///   .all_ticks()
1121    /// # }, |mut stream| async move {
1122    /// // [5, 6, 7, 8]
1123    /// # for w in vec![5, 6, 7, 8] {
1124    /// #     assert_eq!(stream.next().await.unwrap(), w);
1125    /// # }
1126    /// # }));
1127    /// # }
1128    /// ```
1129    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1130    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1131        self.filter_if(other.is_none())
1132    }
1133
1134    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1135    /// returning all tupled pairs.
1136    ///
1137    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1138    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1139    /// symmetric hash join is used and ordering is [`NoOrder`].
1140    ///
1141    /// # Example
1142    /// ```rust
1143    /// # #[cfg(feature = "deploy")] {
1144    /// # use hydro_lang::prelude::*;
1145    /// # use std::collections::HashSet;
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148    /// let tick = process.tick();
1149    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1150    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1151    /// stream1.cross_product(stream2)
1152    /// # }, |mut stream| async move {
1153    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1154    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1155    /// # stream.map(|i| assert!(expected.contains(&i)));
1156    /// # }));
1157    /// # }
1158    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1159        self,
1160        other: Stream<T2, L, B2, O2, R2>,
1161    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1162    where
1163        T: Clone,
1164        T2: Clone,
1165        R: MinRetries<R2>,
1166    {
1167        self.map(q!(|v| ((), v)))
1168            .join(other.map(q!(|v| ((), v))))
1169            .map(q!(|((), (v1, v2))| (v1, v2)))
1170    }
1171
1172    /// Takes one stream as input and filters out any duplicate occurrences. The output
1173    /// contains all unique values from the input.
1174    ///
1175    /// # Example
1176    /// ```rust
1177    /// # #[cfg(feature = "deploy")] {
1178    /// # use hydro_lang::prelude::*;
1179    /// # use futures::StreamExt;
1180    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1181    /// let tick = process.tick();
1182    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1183    /// # }, |mut stream| async move {
1184    /// # for w in vec![1, 2, 3, 4] {
1185    /// #     assert_eq!(stream.next().await.unwrap(), w);
1186    /// # }
1187    /// # }));
1188    /// # }
1189    /// ```
1190    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1191    where
1192        T: Eq + Hash,
1193    {
1194        Stream::new(
1195            self.location.clone(),
1196            HydroNode::Unique {
1197                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1198                metadata: self
1199                    .location
1200                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1201            },
1202        )
1203    }
1204
1205    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1206    ///
1207    /// The `other` stream must be [`Bounded`], since this function will wait until
1208    /// all its elements are available before producing any output.
1209    /// # Example
1210    /// ```rust
1211    /// # #[cfg(feature = "deploy")] {
1212    /// # use hydro_lang::prelude::*;
1213    /// # use futures::StreamExt;
1214    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1215    /// let tick = process.tick();
1216    /// let stream = process
1217    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1218    ///   .batch(&tick, nondet!(/** test */));
1219    /// let batch = process
1220    ///   .source_iter(q!(vec![1, 2]))
1221    ///   .batch(&tick, nondet!(/** test */));
1222    /// stream.filter_not_in(batch).all_ticks()
1223    /// # }, |mut stream| async move {
1224    /// # for w in vec![3, 4] {
1225    /// #     assert_eq!(stream.next().await.unwrap(), w);
1226    /// # }
1227    /// # }));
1228    /// # }
1229    /// ```
1230    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1231    where
1232        T: Eq + Hash,
1233        B2: IsBounded,
1234    {
1235        check_matching_location(&self.location, &other.location);
1236
1237        Stream::new(
1238            self.location.clone(),
1239            HydroNode::Difference {
1240                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1241                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1242                metadata: self
1243                    .location
1244                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1245            },
1246        )
1247    }
1248
1249    /// An operator which allows you to "inspect" each element of a stream without
1250    /// modifying it. The closure `f` is called on a reference to each item. This is
1251    /// mainly useful for debugging, and should not be used to generate side-effects.
1252    ///
1253    /// # Example
1254    /// ```rust
1255    /// # #[cfg(feature = "deploy")] {
1256    /// # use hydro_lang::prelude::*;
1257    /// # use futures::StreamExt;
1258    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1259    /// let nums = process.source_iter(q!(vec![1, 2]));
1260    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1261    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1262    /// # }, |mut stream| async move {
1263    /// # for w in vec![1, 2] {
1264    /// #     assert_eq!(stream.next().await.unwrap(), w);
1265    /// # }
1266    /// # }));
1267    /// # }
1268    /// ```
1269    pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1270        self,
1271        f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1272    ) -> Self
1273    where
1274        F: FnMut(&T) + 'a,
1275        C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1276        Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1277    {
1278        let f = crate::handoff_ref::with_ref_capture(|| {
1279            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1280            proof.register_proof(&expr);
1281            expr.into()
1282        });
1283
1284        Stream::new(
1285            self.location.clone(),
1286            HydroNode::Inspect {
1287                f,
1288                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1289                metadata: self.location.new_node_metadata(Self::collection_kind()),
1290            },
1291        )
1292    }
1293
1294    /// Executes the provided closure for every element in this stream.
1295    ///
1296    /// If the stream is unordered or has retries, the closure must demonstrate commutativity
1297    /// and/or idempotence via annotations:
1298    /// ```rust,ignore
1299    /// stream.for_each(q!(
1300    ///     |x| *flag_mut |= x,
1301    ///     commutative = manual_proof!(/** boolean OR is commutative */),
1302    ///     idempotent = manual_proof!(/** boolean OR is idempotent */)
1303    /// ));
1304    /// ```
1305    ///
1306    /// On a `TotalOrder + ExactlyOnce` stream, no annotations are needed.
1307    ///
1308    /// The closure may capture singletons via `by_ref()` or `by_mut()`.
1309    pub fn for_each<F: FnMut(T) + 'a, C, I>(
1310        self,
1311        f: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, I>>,
1312    ) where
1313        C: ValidCommutativityFor<O>,
1314        I: ValidIdempotenceFor<R>,
1315    {
1316        let f = crate::handoff_ref::with_ref_capture(|| {
1317            let (f, proof) = f.splice_fnmut1_ctx_props(&self.location);
1318            proof.register_proof(&f);
1319            f.into()
1320        });
1321        self.location
1322            .flow_state()
1323            .borrow_mut()
1324            .push_root(HydroRoot::ForEach {
1325                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1326                f,
1327                op_metadata: HydroIrOpMetadata::new(),
1328            });
1329    }
1330
1331    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1332    /// TCP socket to some other server. You should _not_ use this API for interacting with
1333    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1334    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1335    /// interaction with asynchronous sinks.
1336    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1337    where
1338        O: IsOrdered,
1339        R: IsExactlyOnce,
1340        S: 'a + futures::Sink<T> + Unpin,
1341    {
1342        self.location
1343            .flow_state()
1344            .borrow_mut()
1345            .push_root(HydroRoot::DestSink {
1346                sink: sink.splice_typed_ctx(&self.location).into(),
1347                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1348                op_metadata: HydroIrOpMetadata::new(),
1349            });
1350    }
1351
1352    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1353    ///
1354    /// # Example
1355    /// ```rust
1356    /// # #[cfg(feature = "deploy")] {
1357    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1358    /// # use futures::StreamExt;
1359    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1360    /// let tick = process.tick();
1361    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1362    /// numbers.enumerate()
1363    /// # }, |mut stream| async move {
1364    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1365    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1366    /// #     assert_eq!(stream.next().await.unwrap(), w);
1367    /// # }
1368    /// # }));
1369    /// # }
1370    /// ```
1371    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1372    where
1373        O: IsOrdered,
1374        R: IsExactlyOnce,
1375    {
1376        Stream::new(
1377            self.location.clone(),
1378            HydroNode::Enumerate {
1379                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1380                metadata: self.location.new_node_metadata(Stream::<
1381                    (usize, T),
1382                    L,
1383                    B,
1384                    TotalOrder,
1385                    ExactlyOnce,
1386                >::collection_kind()),
1387            },
1388        )
1389    }
1390
1391    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1392    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1393    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1394    ///
1395    /// Depending on the input stream guarantees, the closure may need to be commutative
1396    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1397    ///
1398    /// # Example
1399    /// ```rust
1400    /// # #[cfg(feature = "deploy")] {
1401    /// # use hydro_lang::prelude::*;
1402    /// # use futures::StreamExt;
1403    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1404    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1405    /// words
1406    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1407    ///     .into_stream()
1408    /// # }, |mut stream| async move {
1409    /// // "HELLOWORLD"
1410    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1411    /// # }));
1412    /// # }
1413    /// ```
1414    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1415        self,
1416        init: impl IntoQuotedMut<'a, I, L>,
1417        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1418    ) -> Singleton<A, L, B2>
1419    where
1420        I: Fn() -> A + 'a,
1421        F: 'a + Fn(&mut A, T),
1422        C: ValidCommutativityFor<O>,
1423        Idemp: ValidIdempotenceFor<R>,
1424        B: ApplyMonotoneStream<M, B2>,
1425    {
1426        let init = init.splice_fn0_ctx(&self.location).into();
1427        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1428        proof.register_proof(&comb);
1429
1430        // Only assume_retries (for idempotence), not assume_ordering.
1431        // The fold hook in the simulator handles ordering non-determinism directly.
1432        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1433        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1434
1435        let core = HydroNode::Fold {
1436            init,
1437            acc: comb.into(),
1438            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1439            metadata: retried
1440                .location
1441                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1442            // we do not guarantee consistency at this point because if the algebraic properties
1443            // do not hold in practice, replica consistency may fail to be maintained, so we
1444            // would like the simulator to assert consistency; in the future, this will be dynamic
1445            // based on the proof mechanism
1446        };
1447
1448        Singleton::new(retried.location.clone(), core)
1449            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1450    }
1451
1452    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1453    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1454    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1455    /// reference, so that it can be modified in place.
1456    ///
1457    /// Depending on the input stream guarantees, the closure may need to be commutative
1458    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1459    ///
1460    /// # Example
1461    /// ```rust
1462    /// # #[cfg(feature = "deploy")] {
1463    /// # use hydro_lang::prelude::*;
1464    /// # use futures::StreamExt;
1465    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1466    /// let bools = process.source_iter(q!(vec![false, true, false]));
1467    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1468    /// # }, |mut stream| async move {
1469    /// // true
1470    /// # assert_eq!(stream.next().await.unwrap(), true);
1471    /// # }));
1472    /// # }
1473    /// ```
1474    pub fn reduce<F, C, Idemp>(
1475        self,
1476        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1477    ) -> Optional<T, L, B>
1478    where
1479        F: Fn(&mut T, T) + 'a,
1480        C: ValidCommutativityFor<O>,
1481        Idemp: ValidIdempotenceFor<R>,
1482    {
1483        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1484        proof.register_proof(&f);
1485
1486        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1487        let ordered_etc: Stream<T, L::DropConsistency, B> =
1488            self.assume_retries(nondet).assume_ordering(nondet);
1489
1490        let core = HydroNode::Reduce {
1491            f: f.into(),
1492            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1493            metadata: ordered_etc
1494                .location
1495                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1496        };
1497
1498        Optional::new(ordered_etc.location.clone(), core)
1499            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1500    }
1501
1502    /// Computes the maximum element in the stream as an [`Optional`], which
1503    /// will be empty until the first element in the input arrives.
1504    ///
1505    /// # Example
1506    /// ```rust
1507    /// # #[cfg(feature = "deploy")] {
1508    /// # use hydro_lang::prelude::*;
1509    /// # use futures::StreamExt;
1510    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1511    /// let tick = process.tick();
1512    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1513    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1514    /// batch.max().all_ticks()
1515    /// # }, |mut stream| async move {
1516    /// // 4
1517    /// # assert_eq!(stream.next().await.unwrap(), 4);
1518    /// # }));
1519    /// # }
1520    /// ```
1521    pub fn max(self) -> Optional<T, L, B>
1522    where
1523        T: Ord,
1524    {
1525        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1526            .assume_ordering_trusted_bounded::<TotalOrder>(
1527                nondet!(/** max is commutative, but order affects intermediates */),
1528            )
1529            .reduce(q!(|curr, new| {
1530                if new > *curr {
1531                    *curr = new;
1532                }
1533            }))
1534    }
1535
1536    /// Computes the minimum element in the stream as an [`Optional`], which
1537    /// will be empty until the first element in the input arrives.
1538    ///
1539    /// # Example
1540    /// ```rust
1541    /// # #[cfg(feature = "deploy")] {
1542    /// # use hydro_lang::prelude::*;
1543    /// # use futures::StreamExt;
1544    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1545    /// let tick = process.tick();
1546    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1547    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1548    /// batch.min().all_ticks()
1549    /// # }, |mut stream| async move {
1550    /// // 1
1551    /// # assert_eq!(stream.next().await.unwrap(), 1);
1552    /// # }));
1553    /// # }
1554    /// ```
1555    pub fn min(self) -> Optional<T, L, B>
1556    where
1557        T: Ord,
1558    {
1559        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1560            .assume_ordering_trusted_bounded::<TotalOrder>(
1561                nondet!(/** max is commutative, but order affects intermediates */),
1562            )
1563            .reduce(q!(|curr, new| {
1564                if new < *curr {
1565                    *curr = new;
1566                }
1567            }))
1568    }
1569
1570    /// Computes the first element in the stream as an [`Optional`], which
1571    /// will be empty until the first element in the input arrives.
1572    ///
1573    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1574    /// re-ordering of elements may cause the first element to change.
1575    ///
1576    /// # Example
1577    /// ```rust
1578    /// # #[cfg(feature = "deploy")] {
1579    /// # use hydro_lang::prelude::*;
1580    /// # use futures::StreamExt;
1581    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1582    /// let tick = process.tick();
1583    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1584    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1585    /// batch.first().all_ticks()
1586    /// # }, |mut stream| async move {
1587    /// // 1
1588    /// # assert_eq!(stream.next().await.unwrap(), 1);
1589    /// # }));
1590    /// # }
1591    /// ```
1592    pub fn first(self) -> Optional<T, L, B>
1593    where
1594        O: IsOrdered,
1595    {
1596        self.make_totally_ordered()
1597            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1598            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1599            .reduce(q!(|_, _| {}))
1600    }
1601
1602    /// Computes the last element in the stream as an [`Optional`], which
1603    /// will be empty until an element in the input arrives.
1604    ///
1605    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1606    /// re-ordering of elements may cause the last element to change.
1607    ///
1608    /// # Example
1609    /// ```rust
1610    /// # #[cfg(feature = "deploy")] {
1611    /// # use hydro_lang::prelude::*;
1612    /// # use futures::StreamExt;
1613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1614    /// let tick = process.tick();
1615    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1616    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1617    /// batch.last().all_ticks()
1618    /// # }, |mut stream| async move {
1619    /// // 4
1620    /// # assert_eq!(stream.next().await.unwrap(), 4);
1621    /// # }));
1622    /// # }
1623    /// ```
1624    pub fn last(self) -> Optional<T, L, B>
1625    where
1626        O: IsOrdered,
1627    {
1628        self.make_totally_ordered()
1629            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1630            .reduce(q!(|curr, new| *curr = new))
1631    }
1632
1633    /// Returns a stream containing at most the first `n` elements of the input stream,
1634    /// preserving the original order. Similar to `LIMIT` in SQL.
1635    ///
1636    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1637    /// retries, since the result depends on the order and cardinality of elements.
1638    ///
1639    /// # Example
1640    /// ```rust
1641    /// # #[cfg(feature = "deploy")] {
1642    /// # use hydro_lang::prelude::*;
1643    /// # use futures::StreamExt;
1644    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1645    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1646    /// numbers.limit(q!(3))
1647    /// # }, |mut stream| async move {
1648    /// // 10, 20, 30
1649    /// # for w in vec![10, 20, 30] {
1650    /// #     assert_eq!(stream.next().await.unwrap(), w);
1651    /// # }
1652    /// # }));
1653    /// # }
1654    /// ```
1655    pub fn limit(
1656        self,
1657        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1658    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1659    where
1660        O: IsOrdered,
1661        R: IsExactlyOnce,
1662    {
1663        self.generator(
1664            q!(|| 0usize),
1665            q!(move |count, item| {
1666                if *count == n {
1667                    Generate::Break
1668                } else {
1669                    *count += 1;
1670                    if *count == n {
1671                        Generate::Return(item)
1672                    } else {
1673                        Generate::Yield(item)
1674                    }
1675                }
1676            }),
1677        )
1678    }
1679
1680    /// Collects all the elements of this stream into a single [`Vec`] element.
1681    ///
1682    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1683    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1684    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1685    /// the vector at an arbitrary point in time.
1686    ///
1687    /// # Example
1688    /// ```rust
1689    /// # #[cfg(feature = "deploy")] {
1690    /// # use hydro_lang::prelude::*;
1691    /// # use futures::StreamExt;
1692    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1693    /// let tick = process.tick();
1694    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1695    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1696    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1697    /// # }, |mut stream| async move {
1698    /// // [ vec![1, 2, 3, 4] ]
1699    /// # for w in vec![vec![1, 2, 3, 4]] {
1700    /// #     assert_eq!(stream.next().await.unwrap(), w);
1701    /// # }
1702    /// # }));
1703    /// # }
1704    /// ```
1705    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1706    where
1707        O: IsOrdered,
1708        R: IsExactlyOnce,
1709    {
1710        self.make_totally_ordered().make_exactly_once().fold(
1711            q!(|| vec![]),
1712            q!(|acc, v| {
1713                acc.push(v);
1714            }),
1715        )
1716    }
1717
1718    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1719    /// and emitting each intermediate result.
1720    ///
1721    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1722    /// containing all intermediate accumulated values. The scan operation can also terminate early
1723    /// by returning `None`.
1724    ///
1725    /// The function takes a mutable reference to the accumulator and the current element, and returns
1726    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1727    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1728    ///
1729    /// # Examples
1730    ///
1731    /// Basic usage - running sum:
1732    /// ```rust
1733    /// # #[cfg(feature = "deploy")] {
1734    /// # use hydro_lang::prelude::*;
1735    /// # use futures::StreamExt;
1736    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1737    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1738    ///     q!(|| 0),
1739    ///     q!(|acc, x| {
1740    ///         *acc += x;
1741    ///         Some(*acc)
1742    ///     }),
1743    /// )
1744    /// # }, |mut stream| async move {
1745    /// // Output: 1, 3, 6, 10
1746    /// # for w in vec![1, 3, 6, 10] {
1747    /// #     assert_eq!(stream.next().await.unwrap(), w);
1748    /// # }
1749    /// # }));
1750    /// # }
1751    /// ```
1752    ///
1753    /// Early termination example:
1754    /// ```rust
1755    /// # #[cfg(feature = "deploy")] {
1756    /// # use hydro_lang::prelude::*;
1757    /// # use futures::StreamExt;
1758    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1759    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1760    ///     q!(|| 1),
1761    ///     q!(|state, x| {
1762    ///         *state = *state * x;
1763    ///         if *state > 6 {
1764    ///             None // Terminate the stream
1765    ///         } else {
1766    ///             Some(-*state)
1767    ///         }
1768    ///     }),
1769    /// )
1770    /// # }, |mut stream| async move {
1771    /// // Output: -1, -2, -6
1772    /// # for w in vec![-1, -2, -6] {
1773    /// #     assert_eq!(stream.next().await.unwrap(), w);
1774    /// # }
1775    /// # }));
1776    /// # }
1777    /// ```
1778    pub fn scan<A, U, I, F>(
1779        self,
1780        init: impl IntoQuotedMut<'a, I, L>,
1781        f: impl IntoQuotedMut<'a, F, L>,
1782    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1783    where
1784        O: IsOrdered,
1785        R: IsExactlyOnce,
1786        I: Fn() -> A + 'a,
1787        F: Fn(&mut A, T) -> Option<U> + 'a,
1788    {
1789        let init = init.splice_fn0_ctx(&self.location).into();
1790        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1791
1792        Stream::new(
1793            self.location.clone(),
1794            HydroNode::Scan {
1795                init,
1796                acc: f,
1797                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1798                metadata: self.location.new_node_metadata(
1799                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1800                ),
1801            },
1802        )
1803    }
1804
1805    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1806    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1807    /// by the function.
1808    ///
1809    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1810    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1811    /// emitted. If it resolves to `None`, the item is filtered out.
1812    ///
1813    /// # Examples
1814    ///
1815    /// ```rust
1816    /// # #[cfg(feature = "deploy")] {
1817    /// # use hydro_lang::prelude::*;
1818    /// # use futures::StreamExt;
1819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1820    /// process
1821    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1822    ///     .scan_async_blocking(
1823    ///         q!(|| 0),
1824    ///         q!(|acc, x| {
1825    ///             *acc += x;
1826    ///             let val = *acc;
1827    ///             async move { Some(val) }
1828    ///         }),
1829    ///     )
1830    /// # }, |mut stream| async move {
1831    /// // Output: 1, 3, 6, 10
1832    /// # for w in vec![1, 3, 6, 10] {
1833    /// #     assert_eq!(stream.next().await.unwrap(), w);
1834    /// # }
1835    /// # }));
1836    /// # }
1837    /// ```
1838    pub fn scan_async_blocking<A, U, I, F, Fut>(
1839        self,
1840        init: impl IntoQuotedMut<'a, I, L>,
1841        f: impl IntoQuotedMut<'a, F, L>,
1842    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1843    where
1844        O: IsOrdered,
1845        R: IsExactlyOnce,
1846        I: Fn() -> A + 'a,
1847        F: Fn(&mut A, T) -> Fut + 'a,
1848        Fut: Future<Output = Option<U>> + 'a,
1849    {
1850        let init = init.splice_fn0_ctx(&self.location).into();
1851        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1852
1853        Stream::new(
1854            self.location.clone(),
1855            HydroNode::ScanAsyncBlocking {
1856                init,
1857                acc: f,
1858                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1859                metadata: self.location.new_node_metadata(
1860                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1861                ),
1862            },
1863        )
1864    }
1865
1866    /// Iteratively processes the elements of the stream using a state machine that can yield
1867    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1868    /// syntax in Rust, without requiring special syntax.
1869    ///
1870    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1871    /// state. The second argument defines the processing logic, taking in a mutable reference
1872    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1873    /// variants define what is emitted and whether further inputs should be processed.
1874    ///
1875    /// # Example
1876    /// ```rust
1877    /// # #[cfg(feature = "deploy")] {
1878    /// # use hydro_lang::prelude::*;
1879    /// # use futures::StreamExt;
1880    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1881    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1882    ///     q!(|| 0),
1883    ///     q!(|acc, x| {
1884    ///         *acc += x;
1885    ///         if *acc > 100 {
1886    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1887    ///         } else if *acc % 2 == 0 {
1888    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1889    ///         } else {
1890    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1891    ///         }
1892    ///     }),
1893    /// )
1894    /// # }, |mut stream| async move {
1895    /// // Output: "even", "done!"
1896    /// # let mut results = Vec::new();
1897    /// # for _ in 0..2 {
1898    /// #     results.push(stream.next().await.unwrap());
1899    /// # }
1900    /// # results.sort();
1901    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1902    /// # }));
1903    /// # }
1904    /// ```
1905    pub fn generator<A, U, I, F>(
1906        self,
1907        init: impl IntoQuotedMut<'a, I, L> + Copy,
1908        f: impl IntoQuotedMut<'a, F, L> + Copy,
1909    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1910    where
1911        O: IsOrdered,
1912        R: IsExactlyOnce,
1913        I: Fn() -> A + 'a,
1914        F: Fn(&mut A, T) -> Generate<U> + 'a,
1915    {
1916        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1917        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1918
1919        let this = self.make_totally_ordered().make_exactly_once();
1920
1921        // State is Option<Option<A>>:
1922        //   None = not yet initialized
1923        //   Some(Some(a)) = active with state a
1924        //   Some(None) = terminated
1925        let scan_init = q!(|| None)
1926            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1927            .into();
1928        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1929            if state.is_none() {
1930                *state = Some(Some(init()));
1931            }
1932            match state {
1933                Some(Some(state_value)) => match f(state_value, v) {
1934                    Generate::Yield(out) => Some(Some(out)),
1935                    Generate::Return(out) => {
1936                        *state = Some(None);
1937                        Some(Some(out))
1938                    }
1939                    // Unlike KeyedStream, we can terminate the scan directly on
1940                    // Break/Return because there is only one state (no other keys
1941                    // that still need processing).
1942                    Generate::Break => None,
1943                    Generate::Continue => Some(None),
1944                },
1945                // State is Some(None) after Return; terminate the scan.
1946                _ => None,
1947            }
1948        })
1949        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1950        .into();
1951
1952        let scan_node = HydroNode::Scan {
1953            init: scan_init,
1954            acc: scan_f,
1955            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1956            metadata: this.location.new_node_metadata(Stream::<
1957                Option<U>,
1958                L,
1959                B,
1960                TotalOrder,
1961                ExactlyOnce,
1962            >::collection_kind()),
1963        };
1964
1965        let flatten_f = q!(|d| d)
1966            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1967            .into();
1968        let flatten_node = HydroNode::FlatMap {
1969            f: flatten_f,
1970            input: Box::new(scan_node),
1971            metadata: this
1972                .location
1973                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1974        };
1975
1976        Stream::new(this.location.clone(), flatten_node)
1977    }
1978
1979    /// Given a time interval, returns a stream corresponding to samples taken from the
1980    /// stream roughly at that interval. The output will have elements in the same order
1981    /// as the input, but with arbitrary elements skipped between samples. There is also
1982    /// no guarantee on the exact timing of the samples.
1983    ///
1984    /// # Non-Determinism
1985    /// The output stream is non-deterministic in which elements are sampled, since this
1986    /// is controlled by a clock.
1987    pub fn sample_every(
1988        self,
1989        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1990        nondet: NonDet,
1991    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1992    where
1993        L: TopLevel<'a>,
1994    {
1995        let samples = self.location.source_interval(interval);
1996
1997        let tick = self.location.tick();
1998        self.batch(&tick, nondet)
1999            .filter_if(samples.batch(&tick, nondet).first().is_some())
2000            .all_ticks()
2001            .weaken_retries()
2002    }
2003
2004    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2005    /// stream has not emitted a value since that duration.
2006    ///
2007    /// # Non-Determinism
2008    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2009    /// samples take place, timeouts may be non-deterministically generated or missed,
2010    /// and the notification of the timeout may be delayed as well. There is also no
2011    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2012    /// detected based on when the next sample is taken.
2013    pub fn timeout(
2014        self,
2015        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2016        nondet: NonDet,
2017    ) -> Optional<(), L::DropConsistency, Unbounded>
2018    where
2019        L: TopLevel<'a>,
2020    {
2021        let tick = self.location.tick();
2022
2023        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2024            q!(|| None),
2025            q!(
2026                |latest, _| {
2027                    *latest = Some(Instant::now());
2028                },
2029                commutative = manual_proof!(/** TODO */)
2030            ),
2031        );
2032
2033        latest_received
2034            .snapshot(&tick, nondet)
2035            .filter_map(q!(move |latest_received| {
2036                if let Some(latest_received) = latest_received {
2037                    if Instant::now().duration_since(latest_received) > duration {
2038                        Some(())
2039                    } else {
2040                        None
2041                    }
2042                } else {
2043                    Some(())
2044                }
2045            }))
2046            .latest()
2047    }
2048
2049    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2050    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2051    ///
2052    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2053    /// processed before an acknowledgement is emitted.
2054    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2055        let id = self.location.flow_state().borrow_mut().next_clock_id();
2056        let out_location = Atomic {
2057            tick: Tick {
2058                id,
2059                l: self.location.clone(),
2060            },
2061        };
2062        Stream::new(
2063            out_location.clone(),
2064            HydroNode::BeginAtomic {
2065                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2066                metadata: out_location
2067                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2068            },
2069        )
2070    }
2071
2072    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2073    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2074    /// the order of the input. The output stream will execute in the [`Tick`] that was
2075    /// used to create the atomic section.
2076    ///
2077    /// # Non-Determinism
2078    /// The batch boundaries are non-deterministic and may change across executions.
2079    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2080        self,
2081        tick: &Tick<L2>,
2082        _nondet: NonDet,
2083    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2084        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2085        Stream::new(
2086            tick.drop_consistency(),
2087            HydroNode::Batch {
2088                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2089                metadata: tick
2090                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2091            },
2092        )
2093    }
2094
2095    /// An operator which allows you to "name" a `HydroNode`.
2096    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2097    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2098        {
2099            let mut node = self.ir_node.borrow_mut();
2100            let metadata = node.metadata_mut();
2101            metadata.tag = Some(name.to_owned());
2102        }
2103        self
2104    }
2105
2106    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2107    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2108    /// so uses must be carefully vetted.
2109    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2110    where
2111        B: IsBounded,
2112    {
2113        Optional::new(
2114            self.location.clone(),
2115            HydroNode::Cast {
2116                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2117                metadata: self
2118                    .location
2119                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2120            },
2121        )
2122    }
2123
2124    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2125        if O::ORDERING_KIND == O2::ORDERING_KIND {
2126            Stream::new(
2127                self.location.clone(),
2128                self.ir_node.replace(HydroNode::Placeholder),
2129            )
2130        } else {
2131            panic!(
2132                "Runtime ordering {:?} did not match requested cast {:?}.",
2133                O::ORDERING_KIND,
2134                O2::ORDERING_KIND
2135            )
2136        }
2137    }
2138
2139    /// Explicitly "casts" the stream to a type with a different ordering
2140    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2141    /// by the type-system.
2142    ///
2143    /// # Non-Determinism
2144    /// This function is used as an escape hatch, and any mistakes in the
2145    /// provided ordering guarantee will propagate into the guarantees
2146    /// for the rest of the program.
2147    pub fn assume_ordering<O2: Ordering>(
2148        self,
2149        _nondet: NonDet,
2150    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2151        if O::ORDERING_KIND == O2::ORDERING_KIND {
2152            self.use_ordering_type().weaken_consistency()
2153        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2154            // We can always weaken the ordering guarantee
2155            let target_location = self.location().drop_consistency();
2156            Stream::new(
2157                target_location.clone(),
2158                HydroNode::Cast {
2159                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2160                    metadata: target_location
2161                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2162                },
2163            )
2164        } else {
2165            let target_location = self.location().drop_consistency();
2166            Stream::new(
2167                target_location.clone(),
2168                HydroNode::ObserveNonDet {
2169                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2170                    trusted: false,
2171                    metadata: target_location
2172                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2173                },
2174            )
2175        }
2176    }
2177
2178    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2179    // intermediate states will not be revealed
2180    fn assume_ordering_trusted_bounded<O2: Ordering>(
2181        self,
2182        nondet: NonDet,
2183    ) -> Stream<T, L, B, O2, R> {
2184        if B::BOUNDED {
2185            self.assume_ordering_trusted(nondet)
2186        } else {
2187            let self_location = self.location.clone();
2188            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2189            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2190        }
2191    }
2192
2193    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2194    // is not observable
2195    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2196        self,
2197        _nondet: NonDet,
2198    ) -> Stream<T, L, B, O2, R> {
2199        if O::ORDERING_KIND == O2::ORDERING_KIND {
2200            self.use_ordering_type()
2201        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2202            // We can always weaken the ordering guarantee
2203            Stream::new(
2204                self.location.clone(),
2205                HydroNode::Cast {
2206                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2207                    metadata: self
2208                        .location
2209                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2210                },
2211            )
2212        } else {
2213            Stream::new(
2214                self.location.clone(),
2215                HydroNode::ObserveNonDet {
2216                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2217                    trusted: true,
2218                    metadata: self
2219                        .location
2220                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2221                },
2222            )
2223        }
2224    }
2225
2226    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2227    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2228    /// which is always safe because that is the weakest possible guarantee.
2229    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2230        self.weaken_ordering::<NoOrder>()
2231    }
2232
2233    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2234    /// enforcing that `O2` is weaker than the input ordering guarantee.
2235    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2236        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2237        self.assume_ordering_trusted::<O2>(nondet)
2238    }
2239
2240    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2241    /// implies that `O == TotalOrder`.
2242    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2243    where
2244        O: IsOrdered,
2245    {
2246        self.assume_ordering_trusted(nondet!(/** no-op */))
2247    }
2248
2249    /// Explicitly "casts" the stream to a type with a different retries
2250    /// guarantee. Useful in unsafe code where the lack of retries cannot
2251    /// be proven by the type-system.
2252    ///
2253    /// # Non-Determinism
2254    /// This function is used as an escape hatch, and any mistakes in the
2255    /// provided retries guarantee will propagate into the guarantees
2256    /// for the rest of the program.
2257    pub fn assume_retries<R2: Retries>(
2258        self,
2259        _nondet: NonDet,
2260    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2261        if R::RETRIES_KIND == R2::RETRIES_KIND {
2262            Stream::new(
2263                self.location.drop_consistency(),
2264                self.ir_node.replace(HydroNode::Placeholder),
2265            )
2266        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2267            // We can always weaken the retries guarantee
2268            let target_location = self.location.drop_consistency();
2269            Stream::new(
2270                target_location.clone(),
2271                HydroNode::Cast {
2272                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2273                    metadata: target_location
2274                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2275                },
2276            )
2277        } else {
2278            let target_location = self.location.drop_consistency();
2279            Stream::new(
2280                target_location.clone(),
2281                HydroNode::ObserveNonDet {
2282                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2283                    trusted: false,
2284                    metadata: target_location
2285                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2286                },
2287            )
2288        }
2289    }
2290
2291    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2292    // is not observable
2293    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2294        if R::RETRIES_KIND == R2::RETRIES_KIND {
2295            Stream::new(
2296                self.location.clone(),
2297                self.ir_node.replace(HydroNode::Placeholder),
2298            )
2299        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2300            // We can always weaken the retries guarantee
2301            Stream::new(
2302                self.location.clone(),
2303                HydroNode::Cast {
2304                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2305                    metadata: self
2306                        .location
2307                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2308                },
2309            )
2310        } else {
2311            Stream::new(
2312                self.location.clone(),
2313                HydroNode::ObserveNonDet {
2314                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2315                    trusted: true,
2316                    metadata: self
2317                        .location
2318                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2319                },
2320            )
2321        }
2322    }
2323
2324    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2325    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2326    /// which is always safe because that is the weakest possible guarantee.
2327    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2328        self.weaken_retries::<AtLeastOnce>()
2329    }
2330
2331    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2332    /// enforcing that `R2` is weaker than the input retries guarantee.
2333    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2334        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2335        self.assume_retries_trusted::<R2>(nondet)
2336    }
2337
2338    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2339    /// implies that `R == ExactlyOnce`.
2340    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2341    where
2342        R: IsExactlyOnce,
2343    {
2344        self.assume_retries_trusted(nondet!(/** no-op */))
2345    }
2346
2347    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2348    /// implies that `B == Bounded`.
2349    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2350    where
2351        B: IsBounded,
2352    {
2353        self.weaken_boundedness()
2354    }
2355
2356    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2357    /// which implies that `B == Bounded`.
2358    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2359        if B::BOUNDED == B2::BOUNDED {
2360            Stream::new(
2361                self.location.clone(),
2362                self.ir_node.replace(HydroNode::Placeholder),
2363            )
2364        } else {
2365            // We can always weaken the boundedness
2366            Stream::new(
2367                self.location.clone(),
2368                HydroNode::Cast {
2369                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2370                    metadata: self
2371                        .location
2372                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2373                },
2374            )
2375        }
2376    }
2377}
2378
2379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2380where
2381    L: Location<'a>,
2382{
2383    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2384    ///
2385    /// # Example
2386    /// ```rust
2387    /// # #[cfg(feature = "deploy")] {
2388    /// # use hydro_lang::prelude::*;
2389    /// # use futures::StreamExt;
2390    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2391    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2392    /// # }, |mut stream| async move {
2393    /// // 1, 2, 3
2394    /// # for w in vec![1, 2, 3] {
2395    /// #     assert_eq!(stream.next().await.unwrap(), w);
2396    /// # }
2397    /// # }));
2398    /// # }
2399    /// ```
2400    pub fn cloned(self) -> Stream<T, L, B, O, R>
2401    where
2402        T: Clone,
2403    {
2404        self.map(q!(|d| d.clone()))
2405    }
2406}
2407
2408impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2409where
2410    L: Location<'a>,
2411{
2412    /// Computes the number of elements in the stream as a [`Singleton`].
2413    ///
2414    /// # Example
2415    /// ```rust
2416    /// # #[cfg(feature = "deploy")] {
2417    /// # use hydro_lang::prelude::*;
2418    /// # use futures::StreamExt;
2419    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2420    /// let tick = process.tick();
2421    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2422    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2423    /// batch.count().all_ticks()
2424    /// # }, |mut stream| async move {
2425    /// // 4
2426    /// # assert_eq!(stream.next().await.unwrap(), 4);
2427    /// # }));
2428    /// # }
2429    /// ```
2430    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2431        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2432            /// Order does not affect eventual count, and also does not affect intermediate states.
2433        ))
2434        .fold(
2435            q!(|| 0usize),
2436            q!(
2437                |count, _| *count += 1,
2438                monotone = manual_proof!(/** += 1 is monotone */)
2439            ),
2440        )
2441    }
2442}
2443
2444impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2445    /// Produces a new stream that merges the elements of the two input streams.
2446    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2447    ///
2448    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2449    /// [`Bounded`], you can use [`Stream::chain`] instead.
2450    ///
2451    /// # Example
2452    /// ```rust
2453    /// # #[cfg(feature = "deploy")] {
2454    /// # use hydro_lang::prelude::*;
2455    /// # use futures::StreamExt;
2456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2457    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2458    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2459    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2460    /// # }, |mut stream| async move {
2461    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2462    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2463    /// #     assert_eq!(stream.next().await.unwrap(), w);
2464    /// # }
2465    /// # }));
2466    /// # }
2467    /// ```
2468    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2469        self,
2470        other: Stream<T, L, Unbounded, O2, R2>,
2471    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2472    where
2473        R: MinRetries<R2>,
2474    {
2475        Stream::new(
2476            self.location.clone(),
2477            HydroNode::Chain {
2478                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2479                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2480                metadata: self.location.new_node_metadata(Stream::<
2481                    T,
2482                    L,
2483                    Unbounded,
2484                    NoOrder,
2485                    <R as MinRetries<R2>>::Min,
2486                >::collection_kind()),
2487            },
2488        )
2489    }
2490
2491    /// Deprecated: use [`Stream::merge_unordered`] instead.
2492    #[deprecated(note = "use `merge_unordered` instead")]
2493    pub fn interleave<O2: Ordering, R2: Retries>(
2494        self,
2495        other: Stream<T, L, Unbounded, O2, R2>,
2496    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2497    where
2498        R: MinRetries<R2>,
2499    {
2500        self.merge_unordered(other)
2501    }
2502}
2503
2504impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2505    /// Produces a new stream that combines the elements of the two input streams,
2506    /// preserving the relative order of elements within each input.
2507    ///
2508    /// # Non-Determinism
2509    /// The order in which elements *across* the two streams will be interleaved is
2510    /// non-deterministic, so the order of elements will vary across runs. If the output
2511    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2512    /// but emits an unordered stream. For deterministic first-then-second ordering on
2513    /// bounded streams, use [`Stream::chain`].
2514    ///
2515    /// # Example
2516    /// ```rust
2517    /// # #[cfg(feature = "deploy")] {
2518    /// # use hydro_lang::prelude::*;
2519    /// # use futures::StreamExt;
2520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2521    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2522    /// # process.source_iter(q!(vec![1, 3])).into();
2523    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2524    /// # }, |mut stream| async move {
2525    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2526    /// # for w in vec![1, 3, 2, 4] {
2527    /// #     assert_eq!(stream.next().await.unwrap(), w);
2528    /// # }
2529    /// # }));
2530    /// # }
2531    /// ```
2532    pub fn merge_ordered<R2: Retries>(
2533        self,
2534        other: Stream<T, L, B, TotalOrder, R2>,
2535        _nondet: NonDet,
2536    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2537    where
2538        R: MinRetries<R2>,
2539    {
2540        let target_location = self.location().drop_consistency();
2541        Stream::new(
2542            target_location.clone(),
2543            HydroNode::MergeOrdered {
2544                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2545                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2546                metadata: target_location.new_node_metadata(Stream::<
2547                    T,
2548                    L::DropConsistency,
2549                    B,
2550                    TotalOrder,
2551                    <R as MinRetries<R2>>::Min,
2552                >::collection_kind()),
2553            },
2554        )
2555    }
2556}
2557
2558impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2559where
2560    L: Location<'a>,
2561{
2562    /// Produces a new stream that emits the input elements in sorted order.
2563    ///
2564    /// The input stream can have any ordering guarantee, but the output stream
2565    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2566    /// elements in the input stream are available, so it requires the input stream
2567    /// to be [`Bounded`].
2568    ///
2569    /// # Example
2570    /// ```rust
2571    /// # #[cfg(feature = "deploy")] {
2572    /// # use hydro_lang::prelude::*;
2573    /// # use futures::StreamExt;
2574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2575    /// let tick = process.tick();
2576    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2577    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2578    /// batch.sort().all_ticks()
2579    /// # }, |mut stream| async move {
2580    /// // 1, 2, 3, 4
2581    /// # for w in (1..5) {
2582    /// #     assert_eq!(stream.next().await.unwrap(), w);
2583    /// # }
2584    /// # }));
2585    /// # }
2586    /// ```
2587    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2588    where
2589        B: IsBounded,
2590        T: Ord,
2591    {
2592        let this = self.make_bounded();
2593        Stream::new(
2594            this.location.clone(),
2595            HydroNode::Sort {
2596                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2597                metadata: this
2598                    .location
2599                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2600            },
2601        )
2602    }
2603
2604    /// Produces a new stream that first emits the elements of the `self` stream,
2605    /// and then emits the elements of the `other` stream. The output stream has
2606    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2607    /// [`TotalOrder`] guarantee.
2608    ///
2609    /// Currently, both input streams must be [`Bounded`]. This operator will block
2610    /// on the first stream until all its elements are available. In a future version,
2611    /// we will relax the requirement on the `other` stream.
2612    ///
2613    /// # Example
2614    /// ```rust
2615    /// # #[cfg(feature = "deploy")] {
2616    /// # use hydro_lang::prelude::*;
2617    /// # use futures::StreamExt;
2618    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2619    /// let tick = process.tick();
2620    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2621    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2622    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2623    /// # }, |mut stream| async move {
2624    /// // 2, 3, 4, 5, 1, 2, 3, 4
2625    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2626    /// #     assert_eq!(stream.next().await.unwrap(), w);
2627    /// # }
2628    /// # }));
2629    /// # }
2630    /// ```
2631    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2632        self,
2633        other: Stream<T, L, B2, O2, R2>,
2634    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2635    where
2636        B: IsBounded,
2637        O: MinOrder<O2>,
2638        R: MinRetries<R2>,
2639    {
2640        check_matching_location(&self.location, &other.location);
2641
2642        Stream::new(
2643            self.location.clone(),
2644            HydroNode::Chain {
2645                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2646                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2647                metadata: self.location.new_node_metadata(Stream::<
2648                    T,
2649                    L,
2650                    B2,
2651                    <O as MinOrder<O2>>::Min,
2652                    <R as MinRetries<R2>>::Min,
2653                >::collection_kind()),
2654            },
2655        )
2656    }
2657
2658    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2659    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2660    /// because this is compiled into a nested loop.
2661    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2662        self,
2663        other: Stream<T2, L, Bounded, O2, R2>,
2664    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2665    where
2666        B: IsBounded,
2667        T: Clone,
2668        T2: Clone,
2669        R: MinRetries<R2>,
2670    {
2671        let this = self.make_bounded();
2672        check_matching_location(&this.location, &other.location);
2673
2674        Stream::new(
2675            this.location.clone(),
2676            HydroNode::CrossProduct {
2677                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2678                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2679                metadata: this.location.new_node_metadata(Stream::<
2680                    (T, T2),
2681                    L,
2682                    Bounded,
2683                    <O2 as MinOrder<O>>::Min,
2684                    <R as MinRetries<R2>>::Min,
2685                >::collection_kind()),
2686            },
2687        )
2688    }
2689
2690    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2691    /// `self` used as the values for *each* key.
2692    ///
2693    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2694    /// values. For example, it can be used to send the same set of elements to several cluster
2695    /// members, if the membership information is available as a [`KeyedSingleton`].
2696    ///
2697    /// # Example
2698    /// ```rust
2699    /// # #[cfg(feature = "deploy")] {
2700    /// # use hydro_lang::prelude::*;
2701    /// # use futures::StreamExt;
2702    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2703    /// # let tick = process.tick();
2704    /// let keyed_singleton = // { 1: (), 2: () }
2705    /// # process
2706    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2707    /// #     .into_keyed()
2708    /// #     .batch(&tick, nondet!(/** test */))
2709    /// #     .first();
2710    /// let stream = // [ "a", "b" ]
2711    /// # process
2712    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2713    /// #     .batch(&tick, nondet!(/** test */));
2714    /// stream.repeat_with_keys(keyed_singleton)
2715    /// # .entries().all_ticks()
2716    /// # }, |mut stream| async move {
2717    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2718    /// # let mut results = Vec::new();
2719    /// # for _ in 0..4 {
2720    /// #     results.push(stream.next().await.unwrap());
2721    /// # }
2722    /// # results.sort();
2723    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2724    /// # }));
2725    /// # }
2726    /// ```
2727    pub fn repeat_with_keys<K, V2>(
2728        self,
2729        keys: KeyedSingleton<K, V2, L, Bounded>,
2730    ) -> KeyedStream<K, T, L, Bounded, O, R>
2731    where
2732        B: IsBounded,
2733        K: Clone,
2734        T: Clone,
2735    {
2736        keys.keys()
2737            .assume_ordering_trusted::<TotalOrder>(
2738                nondet!(/** keyed stream does not depend on ordering of keys */),
2739            )
2740            .cross_product_nested_loop(self.make_bounded())
2741            .into_keyed()
2742    }
2743
2744    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2745    /// execution until all results are available. The output order is based on when futures
2746    /// complete, and may be different than the input order.
2747    ///
2748    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2749    /// while futures are pending, this variant blocks until the futures resolve.
2750    ///
2751    /// # Example
2752    /// ```rust
2753    /// # #[cfg(feature = "deploy")] {
2754    /// # use std::collections::HashSet;
2755    /// # use futures::StreamExt;
2756    /// # use hydro_lang::prelude::*;
2757    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2758    /// process
2759    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2760    ///     .map(q!(|x| async move {
2761    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2762    ///         x
2763    ///     }))
2764    ///     .resolve_futures_blocking()
2765    /// #   },
2766    /// #   |mut stream| async move {
2767    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2768    /// #       let mut output = HashSet::new();
2769    /// #       for _ in 1..10 {
2770    /// #           output.insert(stream.next().await.unwrap());
2771    /// #       }
2772    /// #       assert_eq!(
2773    /// #           output,
2774    /// #           HashSet::<i32>::from_iter(1..10)
2775    /// #       );
2776    /// #   },
2777    /// # ));
2778    /// # }
2779    /// ```
2780    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2781    where
2782        T: Future,
2783    {
2784        Stream::new(
2785            self.location.clone(),
2786            HydroNode::ResolveFuturesBlocking {
2787                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788                metadata: self
2789                    .location
2790                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2791            },
2792        )
2793    }
2794
2795    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2796    ///
2797    /// # Example
2798    /// ```rust
2799    /// # #[cfg(feature = "deploy")] {
2800    /// # use hydro_lang::prelude::*;
2801    /// # use futures::StreamExt;
2802    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2803    /// let tick = process.tick();
2804    /// let empty: Stream<i32, _, Bounded> = process
2805    ///   .source_iter(q!(Vec::<i32>::new()))
2806    ///   .batch(&tick, nondet!(/** test */));
2807    /// empty.is_empty().all_ticks()
2808    /// # }, |mut stream| async move {
2809    /// // true
2810    /// # assert_eq!(stream.next().await.unwrap(), true);
2811    /// # }));
2812    /// # }
2813    /// ```
2814    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2815    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2816    where
2817        B: IsBounded,
2818    {
2819        self.make_bounded()
2820            .assume_ordering_trusted::<TotalOrder>(
2821                nondet!(/** is_empty intermediates unaffected by order */),
2822            )
2823            .first()
2824            .is_none()
2825    }
2826}
2827
2828impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2829where
2830    L: Location<'a>,
2831{
2832    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2833    /// by equi-joining the two streams on the key attribute `K`.
2834    ///
2835    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2836    /// and streams the left side through, preserving the left side's ordering. When both
2837    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2838    ///
2839    /// # Example
2840    /// ```rust
2841    /// # #[cfg(feature = "deploy")] {
2842    /// # use hydro_lang::prelude::*;
2843    /// # use std::collections::HashSet;
2844    /// # use futures::StreamExt;
2845    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2846    /// let tick = process.tick();
2847    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2848    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2849    /// stream1.join(stream2)
2850    /// # }, |mut stream| async move {
2851    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2852    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2853    /// # stream.map(|i| assert!(expected.contains(&i)));
2854    /// # }));
2855    /// # }
2856    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2857        self,
2858        n: Stream<(K, V2), L, B2, O2, R2>,
2859    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2860    where
2861        K: Eq + Hash + Clone,
2862        R: MinRetries<R2>,
2863        V1: Clone,
2864        V2: Clone,
2865    {
2866        check_matching_location(&self.location, &n.location);
2867
2868        let ir_node = if B2::BOUNDED {
2869            HydroNode::JoinHalf {
2870                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2871                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2872                metadata: self.location.new_node_metadata(Stream::<
2873                    (K, (V1, V2)),
2874                    L,
2875                    B,
2876                    B2::PreserveOrderIfBounded<O>,
2877                    <R as MinRetries<R2>>::Min,
2878                >::collection_kind()),
2879            }
2880        } else {
2881            HydroNode::Join {
2882                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2883                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2884                metadata: self.location.new_node_metadata(Stream::<
2885                    (K, (V1, V2)),
2886                    L,
2887                    B,
2888                    B2::PreserveOrderIfBounded<O>,
2889                    <R as MinRetries<R2>>::Min,
2890                >::collection_kind()),
2891            }
2892        };
2893
2894        Stream::new(self.location.clone(), ir_node)
2895    }
2896
2897    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2898    /// computes the anti-join of the items in the input -- i.e. returns
2899    /// unique items in the first input that do not have a matching key
2900    /// in the second input.
2901    ///
2902    /// # Example
2903    /// ```rust
2904    /// # #[cfg(feature = "deploy")] {
2905    /// # use hydro_lang::prelude::*;
2906    /// # use futures::StreamExt;
2907    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2908    /// let tick = process.tick();
2909    /// let stream = process
2910    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2911    ///   .batch(&tick, nondet!(/** test */));
2912    /// let batch = process
2913    ///   .source_iter(q!(vec![1, 2]))
2914    ///   .batch(&tick, nondet!(/** test */));
2915    /// stream.anti_join(batch).all_ticks()
2916    /// # }, |mut stream| async move {
2917    /// # for w in vec![(3, 'c'), (4, 'd')] {
2918    /// #     assert_eq!(stream.next().await.unwrap(), w);
2919    /// # }
2920    /// # }));
2921    /// # }
2922    pub fn anti_join<O2: Ordering, R2: Retries>(
2923        self,
2924        n: Stream<K, L, Bounded, O2, R2>,
2925    ) -> Stream<(K, V1), L, B, O, R>
2926    where
2927        K: Eq + Hash,
2928    {
2929        check_matching_location(&self.location, &n.location);
2930
2931        Stream::new(
2932            self.location.clone(),
2933            HydroNode::AntiJoin {
2934                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2935                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2936                metadata: self
2937                    .location
2938                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2939            },
2940        )
2941    }
2942}
2943
2944impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2945    Stream<(K, V), L, B, O, R>
2946{
2947    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2948    /// is used as the key and the second element is added to the entries associated with that key.
2949    ///
2950    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2951    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2952    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2953    /// total ordering _within_ each group but no ordering _across_ groups.
2954    ///
2955    /// # Example
2956    /// ```rust
2957    /// # #[cfg(feature = "deploy")] {
2958    /// # use hydro_lang::prelude::*;
2959    /// # use futures::StreamExt;
2960    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2961    /// process
2962    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2963    ///     .into_keyed()
2964    /// #   .entries()
2965    /// # }, |mut stream| async move {
2966    /// // { 1: [2, 3], 2: [4] }
2967    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2968    /// #     assert_eq!(stream.next().await.unwrap(), w);
2969    /// # }
2970    /// # }));
2971    /// # }
2972    /// ```
2973    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2974        KeyedStream::new(
2975            self.location.clone(),
2976            HydroNode::Cast {
2977                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2978                metadata: self
2979                    .location
2980                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2981            },
2982        )
2983    }
2984}
2985
2986impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2987where
2988    K: Eq + Hash,
2989    L: Location<'a>,
2990{
2991    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2992    /// # Example
2993    /// ```rust
2994    /// # #[cfg(feature = "deploy")] {
2995    /// # use hydro_lang::prelude::*;
2996    /// # use futures::StreamExt;
2997    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2998    /// let tick = process.tick();
2999    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
3000    /// let batch = numbers.batch(&tick, nondet!(/** test */));
3001    /// batch.keys().all_ticks()
3002    /// # }, |mut stream| async move {
3003    /// // 1, 2
3004    /// # assert_eq!(stream.next().await.unwrap(), 1);
3005    /// # assert_eq!(stream.next().await.unwrap(), 2);
3006    /// # }));
3007    /// # }
3008    /// ```
3009    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3010        self.into_keyed()
3011            .fold(
3012                q!(|| ()),
3013                q!(
3014                    |_, _| {},
3015                    commutative = manual_proof!(/** values are ignored */),
3016                    idempotent = manual_proof!(/** values are ignored */)
3017                ),
3018            )
3019            .keys()
3020    }
3021}
3022
3023impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3024where
3025    L: Location<'a>,
3026{
3027    /// Returns a stream corresponding to the latest batch of elements being atomically
3028    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
3029    /// the order of the input.
3030    ///
3031    /// # Non-Determinism
3032    /// The batch boundaries are non-deterministic and may change across executions.
3033    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3034        self,
3035        tick: &Tick<L2>,
3036        _nondet: NonDet,
3037    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3038        Stream::new(
3039            tick.drop_consistency(),
3040            HydroNode::Batch {
3041                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3042                metadata: tick
3043                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3044            },
3045        )
3046    }
3047
3048    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
3049    /// See [`Stream::atomic`] for more details.
3050    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3051        Stream::new(
3052            self.location.tick.l.clone(),
3053            HydroNode::EndAtomic {
3054                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3055                metadata: self
3056                    .location
3057                    .tick
3058                    .l
3059                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3060            },
3061        )
3062    }
3063}
3064
3065impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3066where
3067    L: TopLevel<'a>,
3068    F: Future<Output = T>,
3069{
3070    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3071    /// Future outputs are produced as available, regardless of input arrival order.
3072    ///
3073    /// # Example
3074    /// ```rust
3075    /// # #[cfg(feature = "deploy")] {
3076    /// # use std::collections::HashSet;
3077    /// # use futures::StreamExt;
3078    /// # use hydro_lang::prelude::*;
3079    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3080    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3081    ///     .map(q!(|x| async move {
3082    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3083    ///         x
3084    ///     }))
3085    ///     .resolve_futures()
3086    /// #   },
3087    /// #   |mut stream| async move {
3088    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3089    /// #       let mut output = HashSet::new();
3090    /// #       for _ in 1..10 {
3091    /// #           output.insert(stream.next().await.unwrap());
3092    /// #       }
3093    /// #       assert_eq!(
3094    /// #           output,
3095    /// #           HashSet::<i32>::from_iter(1..10)
3096    /// #       );
3097    /// #   },
3098    /// # ));
3099    /// # }
3100    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3101        Stream::new(
3102            self.location.clone(),
3103            HydroNode::ResolveFutures {
3104                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3105                metadata: self
3106                    .location
3107                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3108            },
3109        )
3110    }
3111
3112    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3113    /// Future outputs are produced in the same order as the input stream.
3114    ///
3115    /// # Example
3116    /// ```rust
3117    /// # #[cfg(feature = "deploy")] {
3118    /// # use std::collections::HashSet;
3119    /// # use futures::StreamExt;
3120    /// # use hydro_lang::prelude::*;
3121    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3122    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3123    ///     .map(q!(|x| async move {
3124    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3125    ///         x
3126    ///     }))
3127    ///     .resolve_futures_ordered()
3128    /// #   },
3129    /// #   |mut stream| async move {
3130    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3131    /// #       let mut output = Vec::new();
3132    /// #       for _ in 1..10 {
3133    /// #           output.push(stream.next().await.unwrap());
3134    /// #       }
3135    /// #       assert_eq!(
3136    /// #           output,
3137    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3138    /// #       );
3139    /// #   },
3140    /// # ));
3141    /// # }
3142    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3143        Stream::new(
3144            self.location.clone(),
3145            HydroNode::ResolveFuturesOrdered {
3146                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3147                metadata: self
3148                    .location
3149                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3150            },
3151        )
3152    }
3153}
3154
3155impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3156where
3157    L: Location<'a>,
3158{
3159    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3160    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3161    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3162        Stream::new(
3163            self.location.outer().clone(),
3164            HydroNode::YieldConcat {
3165                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3166                metadata: self
3167                    .location
3168                    .outer()
3169                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3170            },
3171        )
3172    }
3173
3174    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3175    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3176    ///
3177    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3178    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3179    /// stream's [`Tick`] context.
3180    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3181        let out_location = Atomic {
3182            tick: self.location.clone(),
3183        };
3184
3185        Stream::new(
3186            out_location.clone(),
3187            HydroNode::YieldConcat {
3188                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3189                metadata: out_location
3190                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3191            },
3192        )
3193    }
3194
3195    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3196    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3197    /// input.
3198    ///
3199    /// This API is particularly useful for stateful computation on batches of data, such as
3200    /// maintaining an accumulated state that is up to date with the current batch.
3201    ///
3202    /// # Example
3203    /// ```rust
3204    /// # #[cfg(feature = "deploy")] {
3205    /// # use hydro_lang::prelude::*;
3206    /// # use futures::StreamExt;
3207    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3208    /// let tick = process.tick();
3209    /// # // ticks are lazy by default, forces the second tick to run
3210    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3211    /// # let batch_first_tick = process
3212    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3213    /// #  .batch(&tick, nondet!(/** test */));
3214    /// # let batch_second_tick = process
3215    /// #   .source_iter(q!(vec![5, 6, 7]))
3216    /// #   .batch(&tick, nondet!(/** test */))
3217    /// #   .defer_tick(); // appears on the second tick
3218    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3219    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3220    ///
3221    /// input.batch(&tick, nondet!(/** test */))
3222    ///     .across_ticks(|s| s.count()).all_ticks()
3223    /// # }, |mut stream| async move {
3224    /// // [4, 7]
3225    /// assert_eq!(stream.next().await.unwrap(), 4);
3226    /// assert_eq!(stream.next().await.unwrap(), 7);
3227    /// # }));
3228    /// # }
3229    /// ```
3230    pub fn across_ticks<Out: BatchAtomic<'a>>(
3231        self,
3232        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3233    ) -> Out::Batched {
3234        thunk(self.all_ticks_atomic()).batched_atomic()
3235    }
3236
3237    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3238    /// always has the elements of `self` at tick `T - 1`.
3239    ///
3240    /// At tick `0`, the output stream is empty, since there is no previous tick.
3241    ///
3242    /// This operator enables stateful iterative processing with ticks, by sending data from one
3243    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3244    ///
3245    /// # Example
3246    /// ```rust
3247    /// # #[cfg(feature = "deploy")] {
3248    /// # use hydro_lang::prelude::*;
3249    /// # use futures::StreamExt;
3250    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3251    /// let tick = process.tick();
3252    /// // ticks are lazy by default, forces the second tick to run
3253    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3254    ///
3255    /// let batch_first_tick = process
3256    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3257    ///   .batch(&tick, nondet!(/** test */));
3258    /// let batch_second_tick = process
3259    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3260    ///   .batch(&tick, nondet!(/** test */))
3261    ///   .defer_tick(); // appears on the second tick
3262    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3263    ///
3264    /// changes_across_ticks.clone().filter_not_in(
3265    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3266    /// ).all_ticks()
3267    /// # }, |mut stream| async move {
3268    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3269    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3270    /// #     assert_eq!(stream.next().await.unwrap(), w);
3271    /// # }
3272    /// # }));
3273    /// # }
3274    /// ```
3275    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3276        Stream::new(
3277            self.location.clone(),
3278            HydroNode::DeferTick {
3279                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3280                metadata: self
3281                    .location
3282                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3283            },
3284        )
3285    }
3286}
3287
3288#[cfg(test)]
3289mod tests {
3290    #[cfg(feature = "deploy")]
3291    use futures::{SinkExt, StreamExt};
3292    #[cfg(feature = "deploy")]
3293    use hydro_deploy::Deployment;
3294    #[cfg(feature = "deploy")]
3295    use serde::{Deserialize, Serialize};
3296    #[cfg(any(feature = "deploy", feature = "sim"))]
3297    use stageleft::q;
3298
3299    #[cfg(any(feature = "deploy", feature = "sim"))]
3300    use crate::compile::builder::FlowBuilder;
3301    #[cfg(feature = "deploy")]
3302    use crate::live_collections::sliced::sliced;
3303    #[cfg(feature = "deploy")]
3304    use crate::live_collections::stream::ExactlyOnce;
3305    #[cfg(feature = "sim")]
3306    use crate::live_collections::stream::NoOrder;
3307    #[cfg(any(feature = "deploy", feature = "sim"))]
3308    use crate::live_collections::stream::TotalOrder;
3309    #[cfg(any(feature = "deploy", feature = "sim"))]
3310    use crate::location::Location;
3311    #[cfg(feature = "sim")]
3312    use crate::networking::TCP;
3313    #[cfg(any(feature = "deploy", feature = "sim"))]
3314    use crate::nondet::nondet;
3315
3316    mod backtrace_chained_ops;
3317
3318    #[cfg(feature = "deploy")]
3319    struct P1 {}
3320    #[cfg(feature = "deploy")]
3321    struct P2 {}
3322
3323    #[cfg(feature = "deploy")]
3324    #[derive(Serialize, Deserialize, Debug)]
3325    struct SendOverNetwork {
3326        n: u32,
3327    }
3328
3329    #[cfg(feature = "deploy")]
3330    #[tokio::test]
3331    async fn first_ten_distributed() {
3332        use crate::networking::TCP;
3333
3334        let mut deployment = Deployment::new();
3335
3336        let mut flow = FlowBuilder::new();
3337        let first_node = flow.process::<P1>();
3338        let second_node = flow.process::<P2>();
3339        let external = flow.external::<P2>();
3340
3341        let numbers = first_node.source_iter(q!(0..10));
3342        let out_port = numbers
3343            .map(q!(|n| SendOverNetwork { n }))
3344            .send(&second_node, TCP.fail_stop().bincode())
3345            .send_bincode_external(&external);
3346
3347        let nodes = flow
3348            .with_process(&first_node, deployment.Localhost())
3349            .with_process(&second_node, deployment.Localhost())
3350            .with_external(&external, deployment.Localhost())
3351            .deploy(&mut deployment);
3352
3353        deployment.deploy().await.unwrap();
3354
3355        let mut external_out = nodes.connect(out_port).await;
3356
3357        deployment.start().await.unwrap();
3358
3359        for i in 0..10 {
3360            assert_eq!(external_out.next().await.unwrap().n, i);
3361        }
3362    }
3363
3364    #[cfg(feature = "deploy")]
3365    #[tokio::test]
3366    async fn first_cardinality() {
3367        let mut deployment = Deployment::new();
3368
3369        let mut flow = FlowBuilder::new();
3370        let node = flow.process::<()>();
3371        let external = flow.external::<()>();
3372
3373        let node_tick = node.tick();
3374        let count = node_tick
3375            .singleton(q!([1, 2, 3]))
3376            .into_stream()
3377            .flatten_ordered()
3378            .first()
3379            .into_stream()
3380            .count()
3381            .all_ticks()
3382            .send_bincode_external(&external);
3383
3384        let nodes = flow
3385            .with_process(&node, deployment.Localhost())
3386            .with_external(&external, deployment.Localhost())
3387            .deploy(&mut deployment);
3388
3389        deployment.deploy().await.unwrap();
3390
3391        let mut external_out = nodes.connect(count).await;
3392
3393        deployment.start().await.unwrap();
3394
3395        assert_eq!(external_out.next().await.unwrap(), 1);
3396    }
3397
3398    #[cfg(feature = "deploy")]
3399    #[tokio::test]
3400    async fn unbounded_reduce_remembers_state() {
3401        let mut deployment = Deployment::new();
3402
3403        let mut flow = FlowBuilder::new();
3404        let node = flow.process::<()>();
3405        let external = flow.external::<()>();
3406
3407        let (input_port, input) = node.source_external_bincode(&external);
3408        let out = input
3409            .reduce(q!(|acc, v| *acc += v))
3410            .sample_eager(nondet!(/** test */))
3411            .send_bincode_external(&external);
3412
3413        let nodes = flow
3414            .with_process(&node, deployment.Localhost())
3415            .with_external(&external, deployment.Localhost())
3416            .deploy(&mut deployment);
3417
3418        deployment.deploy().await.unwrap();
3419
3420        let mut external_in = nodes.connect(input_port).await;
3421        let mut external_out = nodes.connect(out).await;
3422
3423        deployment.start().await.unwrap();
3424
3425        external_in.send(1).await.unwrap();
3426        assert_eq!(external_out.next().await.unwrap(), 1);
3427
3428        external_in.send(2).await.unwrap();
3429        assert_eq!(external_out.next().await.unwrap(), 3);
3430    }
3431
3432    #[cfg(feature = "deploy")]
3433    #[tokio::test]
3434    async fn top_level_bounded_cross_singleton() {
3435        let mut deployment = Deployment::new();
3436
3437        let mut flow = FlowBuilder::new();
3438        let node = flow.process::<()>();
3439        let external = flow.external::<()>();
3440
3441        let (input_port, input) =
3442            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3443
3444        let out = input
3445            .cross_singleton(
3446                node.source_iter(q!(vec![1, 2, 3]))
3447                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3448            )
3449            .send_bincode_external(&external);
3450
3451        let nodes = flow
3452            .with_process(&node, deployment.Localhost())
3453            .with_external(&external, deployment.Localhost())
3454            .deploy(&mut deployment);
3455
3456        deployment.deploy().await.unwrap();
3457
3458        let mut external_in = nodes.connect(input_port).await;
3459        let mut external_out = nodes.connect(out).await;
3460
3461        deployment.start().await.unwrap();
3462
3463        external_in.send(1).await.unwrap();
3464        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3465
3466        external_in.send(2).await.unwrap();
3467        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3468    }
3469
3470    #[cfg(feature = "deploy")]
3471    #[tokio::test]
3472    async fn top_level_bounded_reduce_cardinality() {
3473        let mut deployment = Deployment::new();
3474
3475        let mut flow = FlowBuilder::new();
3476        let node = flow.process::<()>();
3477        let external = flow.external::<()>();
3478
3479        let (input_port, input) =
3480            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3481
3482        let out = sliced! {
3483            let input = use(input, nondet!(/** test */));
3484            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3485            input.cross_singleton(v.into_stream().count())
3486        }
3487        .send_bincode_external(&external);
3488
3489        let nodes = flow
3490            .with_process(&node, deployment.Localhost())
3491            .with_external(&external, deployment.Localhost())
3492            .deploy(&mut deployment);
3493
3494        deployment.deploy().await.unwrap();
3495
3496        let mut external_in = nodes.connect(input_port).await;
3497        let mut external_out = nodes.connect(out).await;
3498
3499        deployment.start().await.unwrap();
3500
3501        external_in.send(1).await.unwrap();
3502        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3503
3504        external_in.send(2).await.unwrap();
3505        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3506    }
3507
3508    #[cfg(feature = "deploy")]
3509    #[tokio::test]
3510    async fn top_level_bounded_into_singleton_cardinality() {
3511        let mut deployment = Deployment::new();
3512
3513        let mut flow = FlowBuilder::new();
3514        let node = flow.process::<()>();
3515        let external = flow.external::<()>();
3516
3517        let (input_port, input) =
3518            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3519
3520        let out = sliced! {
3521            let input = use(input, nondet!(/** test */));
3522            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3523            input.cross_singleton(v.into_stream().count())
3524        }
3525        .send_bincode_external(&external);
3526
3527        let nodes = flow
3528            .with_process(&node, deployment.Localhost())
3529            .with_external(&external, deployment.Localhost())
3530            .deploy(&mut deployment);
3531
3532        deployment.deploy().await.unwrap();
3533
3534        let mut external_in = nodes.connect(input_port).await;
3535        let mut external_out = nodes.connect(out).await;
3536
3537        deployment.start().await.unwrap();
3538
3539        external_in.send(1).await.unwrap();
3540        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3541
3542        external_in.send(2).await.unwrap();
3543        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3544    }
3545
3546    #[cfg(feature = "deploy")]
3547    #[tokio::test]
3548    async fn atomic_fold_replays_each_tick() {
3549        let mut deployment = Deployment::new();
3550
3551        let mut flow = FlowBuilder::new();
3552        let node = flow.process::<()>();
3553        let external = flow.external::<()>();
3554
3555        let (input_port, input) =
3556            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3557        let tick = node.tick();
3558
3559        let out = input
3560            .batch(&tick, nondet!(/** test */))
3561            .cross_singleton(
3562                node.source_iter(q!(vec![1, 2, 3]))
3563                    .atomic()
3564                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3565                    .snapshot_atomic(&tick, nondet!(/** test */)),
3566            )
3567            .all_ticks()
3568            .send_bincode_external(&external);
3569
3570        let nodes = flow
3571            .with_process(&node, deployment.Localhost())
3572            .with_external(&external, deployment.Localhost())
3573            .deploy(&mut deployment);
3574
3575        deployment.deploy().await.unwrap();
3576
3577        let mut external_in = nodes.connect(input_port).await;
3578        let mut external_out = nodes.connect(out).await;
3579
3580        deployment.start().await.unwrap();
3581
3582        external_in.send(1).await.unwrap();
3583        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3584
3585        external_in.send(2).await.unwrap();
3586        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3587    }
3588
3589    #[cfg(feature = "deploy")]
3590    #[tokio::test]
3591    async fn unbounded_scan_remembers_state() {
3592        let mut deployment = Deployment::new();
3593
3594        let mut flow = FlowBuilder::new();
3595        let node = flow.process::<()>();
3596        let external = flow.external::<()>();
3597
3598        let (input_port, input) = node.source_external_bincode(&external);
3599        let out = input
3600            .scan(
3601                q!(|| 0),
3602                q!(|acc, v| {
3603                    *acc += v;
3604                    Some(*acc)
3605                }),
3606            )
3607            .send_bincode_external(&external);
3608
3609        let nodes = flow
3610            .with_process(&node, deployment.Localhost())
3611            .with_external(&external, deployment.Localhost())
3612            .deploy(&mut deployment);
3613
3614        deployment.deploy().await.unwrap();
3615
3616        let mut external_in = nodes.connect(input_port).await;
3617        let mut external_out = nodes.connect(out).await;
3618
3619        deployment.start().await.unwrap();
3620
3621        external_in.send(1).await.unwrap();
3622        assert_eq!(external_out.next().await.unwrap(), 1);
3623
3624        external_in.send(2).await.unwrap();
3625        assert_eq!(external_out.next().await.unwrap(), 3);
3626    }
3627
3628    #[cfg(feature = "deploy")]
3629    #[tokio::test]
3630    async fn unbounded_enumerate_remembers_state() {
3631        let mut deployment = Deployment::new();
3632
3633        let mut flow = FlowBuilder::new();
3634        let node = flow.process::<()>();
3635        let external = flow.external::<()>();
3636
3637        let (input_port, input) = node.source_external_bincode(&external);
3638        let out = input.enumerate().send_bincode_external(&external);
3639
3640        let nodes = flow
3641            .with_process(&node, deployment.Localhost())
3642            .with_external(&external, deployment.Localhost())
3643            .deploy(&mut deployment);
3644
3645        deployment.deploy().await.unwrap();
3646
3647        let mut external_in = nodes.connect(input_port).await;
3648        let mut external_out = nodes.connect(out).await;
3649
3650        deployment.start().await.unwrap();
3651
3652        external_in.send(1).await.unwrap();
3653        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3654
3655        external_in.send(2).await.unwrap();
3656        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3657    }
3658
3659    #[cfg(feature = "deploy")]
3660    #[tokio::test]
3661    async fn unbounded_unique_remembers_state() {
3662        let mut deployment = Deployment::new();
3663
3664        let mut flow = FlowBuilder::new();
3665        let node = flow.process::<()>();
3666        let external = flow.external::<()>();
3667
3668        let (input_port, input) =
3669            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3670        let out = input.unique().send_bincode_external(&external);
3671
3672        let nodes = flow
3673            .with_process(&node, deployment.Localhost())
3674            .with_external(&external, deployment.Localhost())
3675            .deploy(&mut deployment);
3676
3677        deployment.deploy().await.unwrap();
3678
3679        let mut external_in = nodes.connect(input_port).await;
3680        let mut external_out = nodes.connect(out).await;
3681
3682        deployment.start().await.unwrap();
3683
3684        external_in.send(1).await.unwrap();
3685        assert_eq!(external_out.next().await.unwrap(), 1);
3686
3687        external_in.send(2).await.unwrap();
3688        assert_eq!(external_out.next().await.unwrap(), 2);
3689
3690        external_in.send(1).await.unwrap();
3691        external_in.send(3).await.unwrap();
3692        assert_eq!(external_out.next().await.unwrap(), 3);
3693    }
3694
3695    #[cfg(feature = "sim")]
3696    #[test]
3697    #[should_panic]
3698    fn sim_batch_nondet_size() {
3699        let mut flow = FlowBuilder::new();
3700        let node = flow.process::<()>();
3701
3702        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3703
3704        let tick = node.tick();
3705        let out_recv = input
3706            .batch(&tick, nondet!(/** test */))
3707            .count()
3708            .all_ticks()
3709            .sim_output();
3710
3711        flow.sim().exhaustive(async || {
3712            in_send.send(());
3713            in_send.send(());
3714            in_send.send(());
3715
3716            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3717        });
3718    }
3719
3720    #[cfg(feature = "sim")]
3721    #[test]
3722    fn sim_batch_preserves_order() {
3723        let mut flow = FlowBuilder::new();
3724        let node = flow.process::<()>();
3725
3726        let (in_send, input) = node.sim_input();
3727
3728        let tick = node.tick();
3729        let out_recv = input
3730            .batch(&tick, nondet!(/** test */))
3731            .all_ticks()
3732            .sim_output();
3733
3734        flow.sim().exhaustive(async || {
3735            in_send.send(1);
3736            in_send.send(2);
3737            in_send.send(3);
3738
3739            out_recv.assert_yields_only([1, 2, 3]).await;
3740        });
3741    }
3742
3743    #[cfg(feature = "sim")]
3744    #[test]
3745    #[should_panic]
3746    fn sim_batch_unordered_shuffles() {
3747        let mut flow = FlowBuilder::new();
3748        let node = flow.process::<()>();
3749
3750        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3751
3752        let tick = node.tick();
3753        let batch = input.batch(&tick, nondet!(/** test */));
3754        let out_recv = batch
3755            .clone()
3756            .min()
3757            .zip(batch.max())
3758            .all_ticks()
3759            .sim_output();
3760
3761        flow.sim().exhaustive(async || {
3762            in_send.send_many_unordered([1, 2, 3]);
3763
3764            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3765                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3766            }
3767        });
3768    }
3769
3770    #[cfg(feature = "sim")]
3771    #[test]
3772    fn sim_batch_unordered_shuffles_count() {
3773        let mut flow = FlowBuilder::new();
3774        let node = flow.process::<()>();
3775
3776        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3777
3778        let tick = node.tick();
3779        let batch = input.batch(&tick, nondet!(/** test */));
3780        let out_recv = batch.all_ticks().sim_output();
3781
3782        let instance_count = flow.sim().exhaustive(async || {
3783            in_send.send_many_unordered([1, 2, 3, 4]);
3784            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3785        });
3786
3787        assert_eq!(
3788            instance_count,
3789            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3790        )
3791    }
3792
3793    #[cfg(feature = "sim")]
3794    #[test]
3795    #[should_panic]
3796    fn sim_observe_order_batched() {
3797        let mut flow = FlowBuilder::new();
3798        let node = flow.process::<()>();
3799
3800        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3801
3802        let tick = node.tick();
3803        let batch = input.batch(&tick, nondet!(/** test */));
3804        let out_recv = batch
3805            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3806            .all_ticks()
3807            .sim_output();
3808
3809        flow.sim().exhaustive(async || {
3810            in_send.send_many_unordered([1, 2, 3, 4]);
3811            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3812        });
3813    }
3814
3815    #[cfg(feature = "sim")]
3816    #[test]
3817    fn sim_observe_order_batched_count() {
3818        let mut flow = FlowBuilder::new();
3819        let node = flow.process::<()>();
3820
3821        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3822
3823        let tick = node.tick();
3824        let batch = input.batch(&tick, nondet!(/** test */));
3825        let out_recv = batch
3826            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3827            .all_ticks()
3828            .sim_output();
3829
3830        let instance_count = flow.sim().exhaustive(async || {
3831            in_send.send_many_unordered([1, 2, 3, 4]);
3832            let _ = out_recv.collect::<Vec<_>>().await;
3833        });
3834
3835        assert_eq!(
3836            instance_count,
3837            192 // 4! * 2^{4 - 1}
3838        )
3839    }
3840
3841    #[cfg(feature = "sim")]
3842    #[test]
3843    fn sim_unordered_count_instance_count() {
3844        let mut flow = FlowBuilder::new();
3845        let node = flow.process::<()>();
3846
3847        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3848
3849        let tick = node.tick();
3850        let out_recv = input
3851            .count()
3852            .snapshot(&tick, nondet!(/** test */))
3853            .all_ticks()
3854            .sim_output();
3855
3856        let instance_count = flow.sim().exhaustive(async || {
3857            in_send.send_many_unordered([1, 2, 3, 4]);
3858            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3859        });
3860
3861        assert_eq!(
3862            instance_count,
3863            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3864        )
3865    }
3866
3867    #[cfg(feature = "sim")]
3868    #[test]
3869    fn sim_top_level_assume_ordering() {
3870        let mut flow = FlowBuilder::new();
3871        let node = flow.process::<()>();
3872
3873        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3874
3875        let out_recv = input
3876            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3877            .sim_output();
3878
3879        let instance_count = flow.sim().exhaustive(async || {
3880            in_send.send_many_unordered([1, 2, 3]);
3881            let mut out = out_recv.collect::<Vec<_>>().await;
3882            out.sort();
3883            assert_eq!(out, vec![1, 2, 3]);
3884        });
3885
3886        assert_eq!(instance_count, 6)
3887    }
3888
3889    #[cfg(feature = "sim")]
3890    #[test]
3891    fn sim_top_level_assume_ordering_cycle_back() {
3892        let mut flow = FlowBuilder::new();
3893        let node = flow.process::<()>();
3894        let node2 = flow.process::<()>();
3895
3896        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3897
3898        let (complete_cycle_back, cycle_back) =
3899            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3900        let ordered = input
3901            .merge_unordered(cycle_back)
3902            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3903        complete_cycle_back.complete(
3904            ordered
3905                .clone()
3906                .map(q!(|v| v + 1))
3907                .filter(q!(|v| v % 2 == 1))
3908                .send(&node2, TCP.fail_stop().bincode())
3909                .send(&node, TCP.fail_stop().bincode()),
3910        );
3911
3912        let out_recv = ordered.sim_output();
3913
3914        let mut saw = false;
3915        let instance_count = flow.sim().exhaustive(async || {
3916            in_send.send_many_unordered([0, 2]);
3917            let out = out_recv.collect::<Vec<_>>().await;
3918
3919            if out.starts_with(&[0, 1, 2]) {
3920                saw = true;
3921            }
3922        });
3923
3924        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3925        assert_eq!(instance_count, 6);
3926    }
3927
3928    #[cfg(feature = "sim")]
3929    #[test]
3930    fn sim_top_level_assume_ordering_cycle_back_tick() {
3931        let mut flow = FlowBuilder::new();
3932        let node = flow.process::<()>();
3933        let node2 = flow.process::<()>();
3934
3935        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3936
3937        let (complete_cycle_back, cycle_back) =
3938            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3939        let ordered = input
3940            .merge_unordered(cycle_back)
3941            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3942        complete_cycle_back.complete(
3943            ordered
3944                .clone()
3945                .batch(&node.tick(), nondet!(/** test */))
3946                .all_ticks()
3947                .map(q!(|v| v + 1))
3948                .filter(q!(|v| v % 2 == 1))
3949                .send(&node2, TCP.fail_stop().bincode())
3950                .send(&node, TCP.fail_stop().bincode()),
3951        );
3952
3953        let out_recv = ordered.sim_output();
3954
3955        let mut saw = false;
3956        let instance_count = flow.sim().exhaustive(async || {
3957            in_send.send_many_unordered([0, 2]);
3958            let out = out_recv.collect::<Vec<_>>().await;
3959
3960            if out.starts_with(&[0, 1, 2]) {
3961                saw = true;
3962            }
3963        });
3964
3965        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3966        assert_eq!(instance_count, 58);
3967    }
3968
3969    #[cfg(feature = "sim")]
3970    #[test]
3971    fn sim_top_level_assume_ordering_multiple() {
3972        let mut flow = FlowBuilder::new();
3973        let node = flow.process::<()>();
3974        let node2 = flow.process::<()>();
3975
3976        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3977        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3978
3979        let (complete_cycle_back, cycle_back) =
3980            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3981        let input1_ordered = input
3982            .clone()
3983            .merge_unordered(cycle_back)
3984            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3985        let foo = input1_ordered
3986            .clone()
3987            .map(q!(|v| v + 3))
3988            .weaken_ordering::<NoOrder>()
3989            .merge_unordered(input2)
3990            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3991
3992        complete_cycle_back.complete(
3993            foo.filter(q!(|v| *v == 3))
3994                .send(&node2, TCP.fail_stop().bincode())
3995                .send(&node, TCP.fail_stop().bincode()),
3996        );
3997
3998        let out_recv = input1_ordered.sim_output();
3999
4000        let mut saw = false;
4001        let instance_count = flow.sim().exhaustive(async || {
4002            in_send.send_many_unordered([0, 1]);
4003            let out = out_recv.collect::<Vec<_>>().await;
4004
4005            if out.starts_with(&[0, 3, 1]) {
4006                saw = true;
4007            }
4008        });
4009
4010        assert!(saw, "did not see an instance with 0, 3, 1 in order");
4011        assert_eq!(instance_count, 24);
4012    }
4013
4014    #[cfg(feature = "sim")]
4015    #[test]
4016    fn sim_atomic_assume_ordering_cycle_back() {
4017        let mut flow = FlowBuilder::new();
4018        let node = flow.process::<()>();
4019        let node2 = flow.process::<()>();
4020
4021        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4022
4023        let (complete_cycle_back, cycle_back) =
4024            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4025        let ordered = input
4026            .merge_unordered(cycle_back)
4027            .atomic()
4028            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4029            .end_atomic();
4030        complete_cycle_back.complete(
4031            ordered
4032                .clone()
4033                .map(q!(|v| v + 1))
4034                .filter(q!(|v| v % 2 == 1))
4035                .send(&node2, TCP.fail_stop().bincode())
4036                .send(&node, TCP.fail_stop().bincode()),
4037        );
4038
4039        let out_recv = ordered.sim_output();
4040
4041        let instance_count = flow.sim().exhaustive(async || {
4042            in_send.send_many_unordered([0, 2]);
4043            let out = out_recv.collect::<Vec<_>>().await;
4044            assert_eq!(out.len(), 4);
4045        });
4046        assert_eq!(instance_count, 22);
4047    }
4048
4049    #[cfg(feature = "deploy")]
4050    #[tokio::test]
4051    async fn partition_evens_odds() {
4052        let mut deployment = Deployment::new();
4053
4054        let mut flow = FlowBuilder::new();
4055        let node = flow.process::<()>();
4056        let external = flow.external::<()>();
4057
4058        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4059        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4060        let evens_port = evens.send_bincode_external(&external);
4061        let odds_port = odds.send_bincode_external(&external);
4062
4063        let nodes = flow
4064            .with_process(&node, deployment.Localhost())
4065            .with_external(&external, deployment.Localhost())
4066            .deploy(&mut deployment);
4067
4068        deployment.deploy().await.unwrap();
4069
4070        let mut evens_out = nodes.connect(evens_port).await;
4071        let mut odds_out = nodes.connect(odds_port).await;
4072
4073        deployment.start().await.unwrap();
4074
4075        let mut even_results = Vec::new();
4076        for _ in 0..3 {
4077            even_results.push(evens_out.next().await.unwrap());
4078        }
4079        even_results.sort();
4080        assert_eq!(even_results, vec![2, 4, 6]);
4081
4082        let mut odd_results = Vec::new();
4083        for _ in 0..3 {
4084            odd_results.push(odds_out.next().await.unwrap());
4085        }
4086        odd_results.sort();
4087        assert_eq!(odd_results, vec![1, 3, 5]);
4088    }
4089
4090    #[cfg(feature = "deploy")]
4091    #[tokio::test]
4092    async fn unconsumed_inspect_still_runs() {
4093        use crate::deploy::DeployCrateWrapper;
4094
4095        let mut deployment = Deployment::new();
4096
4097        let mut flow = FlowBuilder::new();
4098        let node = flow.process::<()>();
4099
4100        // The return value of .inspect() is intentionally dropped.
4101        // Before the Null-root fix, this would silently do nothing.
4102        node.source_iter(q!(0..5))
4103            .inspect(q!(|x| println!("inspect: {}", x)));
4104
4105        let nodes = flow
4106            .with_process(&node, deployment.Localhost())
4107            .deploy(&mut deployment);
4108
4109        deployment.deploy().await.unwrap();
4110
4111        let mut stdout = nodes.get_process(&node).stdout();
4112
4113        deployment.start().await.unwrap();
4114
4115        let mut lines = Vec::new();
4116        for _ in 0..5 {
4117            lines.push(stdout.recv().await.unwrap());
4118        }
4119        lines.sort();
4120        assert_eq!(
4121            lines,
4122            vec![
4123                "inspect: 0",
4124                "inspect: 1",
4125                "inspect: 2",
4126                "inspect: 3",
4127                "inspect: 4",
4128            ]
4129        );
4130    }
4131
4132    #[cfg(feature = "sim")]
4133    #[test]
4134    fn sim_limit() {
4135        let mut flow = FlowBuilder::new();
4136        let node = flow.process::<()>();
4137
4138        let (in_send, input) = node.sim_input();
4139
4140        let out_recv = input.limit(q!(3)).sim_output();
4141
4142        flow.sim().exhaustive(async || {
4143            in_send.send(1);
4144            in_send.send(2);
4145            in_send.send(3);
4146            in_send.send(4);
4147            in_send.send(5);
4148
4149            out_recv.assert_yields_only([1, 2, 3]).await;
4150        });
4151    }
4152
4153    #[cfg(feature = "sim")]
4154    #[test]
4155    fn sim_limit_zero() {
4156        let mut flow = FlowBuilder::new();
4157        let node = flow.process::<()>();
4158
4159        let (in_send, input) = node.sim_input();
4160
4161        let out_recv = input.limit(q!(0)).sim_output();
4162
4163        flow.sim().exhaustive(async || {
4164            in_send.send(1);
4165            in_send.send(2);
4166
4167            out_recv.assert_yields_only::<i32, _>([]).await;
4168        });
4169    }
4170
4171    #[cfg(feature = "sim")]
4172    #[test]
4173    fn sim_merge_ordered() {
4174        let mut flow = FlowBuilder::new();
4175        let node = flow.process::<()>();
4176
4177        let (in_send, input) = node.sim_input();
4178        let (in_send2, input2) = node.sim_input();
4179
4180        let out_recv = input
4181            .merge_ordered(input2, nondet!(/** test */))
4182            .sim_output();
4183
4184        let mut saw_out_of_order = false;
4185        let instances = flow.sim().exhaustive(async || {
4186            in_send.send(1);
4187            in_send.send(2);
4188            in_send2.send(3);
4189            in_send2.send(4);
4190
4191            let out = out_recv.collect::<Vec<_>>().await;
4192
4193            if out == [1, 3, 2, 4] {
4194                saw_out_of_order = true;
4195            }
4196
4197            // Assert ordering preservation: elements from each input must
4198            // appear in their original relative order.
4199            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4200            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4201            assert_eq!(
4202                first_elements,
4203                vec![1, 2],
4204                "first input order violated: {:?}",
4205                out
4206            );
4207            assert_eq!(
4208                second_elements,
4209                vec![3, 4],
4210                "second input order violated: {:?}",
4211                out
4212            );
4213
4214            first_elements.append(&mut second_elements);
4215            first_elements.sort();
4216            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4217        });
4218
4219        assert!(saw_out_of_order);
4220        assert_eq!(instances, 6);
4221    }
4222
4223    /// Tests that merge_ordered passes through elements when only one input
4224    /// has data.
4225    #[cfg(feature = "sim")]
4226    #[test]
4227    fn sim_merge_ordered_one_empty() {
4228        let mut flow = FlowBuilder::new();
4229        let node = flow.process::<()>();
4230
4231        let (in_send, input) = node.sim_input();
4232        let (_in_send2, input2) = node.sim_input();
4233
4234        let out_recv = input
4235            .merge_ordered(input2, nondet!(/** test */))
4236            .sim_output();
4237
4238        let instances = flow.sim().exhaustive(async || {
4239            in_send.send(1);
4240            in_send.send(2);
4241
4242            let out = out_recv.collect::<Vec<_>>().await;
4243            assert_eq!(out, vec![1, 2]);
4244        });
4245
4246        // Only one possible interleaving when one input is empty
4247        assert_eq!(instances, 1);
4248    }
4249
4250    /// Tests that merge_ordered correctly handles feedback cycles.
4251    /// An element output from merge_ordered is filtered and cycled back to
4252    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4253    /// element to arrive and potentially be emitted before elements still
4254    /// waiting on the other input.
4255    #[cfg(feature = "sim")]
4256    #[test]
4257    fn sim_merge_ordered_cycle_back() {
4258        let mut flow = FlowBuilder::new();
4259        let node = flow.process::<()>();
4260
4261        let (in_send, input) = node.sim_input();
4262
4263        // Create a forward ref for the cycle back
4264        let (complete_cycle_back, cycle_back) =
4265            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4266
4267        // merge_ordered: input (external) with cycle_back
4268        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4269
4270        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4271        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4272
4273        let out_recv = merged.sim_output();
4274
4275        // Send 1 and 2. Element 1 should cycle back as 10.
4276        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4277        let mut saw_cycle_before_second = false;
4278        flow.sim().exhaustive(async || {
4279            in_send.send(1);
4280            in_send.send(2);
4281
4282            let out = out_recv.collect::<Vec<_>>().await;
4283
4284            // 10 must always come after 1 (causal dependency)
4285            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4286            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4287            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4288
4289            // Check if we see [1, 10, 2] — the cycled element beats the second input
4290            if out == [1, 10, 2] {
4291                saw_cycle_before_second = true;
4292            }
4293
4294            let mut sorted = out;
4295            sorted.sort();
4296            assert_eq!(sorted, vec![1, 2, 10]);
4297        });
4298
4299        assert!(
4300            saw_cycle_before_second,
4301            "never saw the cycled element arrive before the second input element"
4302        );
4303    }
4304
4305    /// Tests that merge_ordered correctly interleaves when one input has a
4306    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4307    /// element 2 should be able to appear after b's elements.
4308    #[cfg(feature = "sim")]
4309    #[test]
4310    fn sim_merge_ordered_delayed() {
4311        let mut flow = FlowBuilder::new();
4312        let node = flow.process::<()>();
4313
4314        let (in_send, input) = node.sim_input();
4315        let (in_send2, input2) = node.sim_input();
4316
4317        let out_recv = input
4318            .merge_ordered(input2, nondet!(/** test */))
4319            .sim_output();
4320
4321        let mut saw_delayed_interleaving = false;
4322        flow.sim().exhaustive(async || {
4323            // Send 1 from a, and 3, 4 from b
4324            in_send.send(1);
4325            in_send2.send(3);
4326            in_send2.send(4);
4327
4328            // Collect what's available so far
4329            let first_batch = out_recv.collect::<Vec<_>>().await;
4330
4331            // Now send the delayed element 2 from a
4332            in_send.send(2);
4333            let second_batch = out_recv.collect::<Vec<_>>().await;
4334
4335            let mut all: Vec<_> = first_batch
4336                .iter()
4337                .chain(second_batch.iter())
4338                .copied()
4339                .collect();
4340
4341            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4342            if all == [1, 3, 4, 2] {
4343                saw_delayed_interleaving = true;
4344            }
4345
4346            all.sort();
4347            assert_eq!(all, vec![1, 2, 3, 4]);
4348        });
4349
4350        assert!(saw_delayed_interleaving);
4351    }
4352
4353    /// Deploy test: merge_ordered with a delayed element on one input.
4354    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4355    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4356    /// both inputs are pulled and the delayed element arrives later.
4357    #[cfg(feature = "deploy")]
4358    #[tokio::test]
4359    async fn deploy_merge_ordered_delayed() {
4360        let mut deployment = Deployment::new();
4361
4362        let mut flow = FlowBuilder::new();
4363        let node = flow.process::<()>();
4364        let external = flow.external::<()>();
4365
4366        let (input_a_port, input_a) = node.source_external_bincode(&external);
4367        let (input_b_port, input_b) = node.source_external_bincode(&external);
4368
4369        let out = input_a
4370            .assume_ordering(nondet!(/** test */))
4371            .merge_ordered(
4372                input_b.assume_ordering(nondet!(/** test */)),
4373                nondet!(/** test */),
4374            )
4375            .send_bincode_external(&external);
4376
4377        let nodes = flow
4378            .with_process(&node, deployment.Localhost())
4379            .with_external(&external, deployment.Localhost())
4380            .deploy(&mut deployment);
4381
4382        deployment.deploy().await.unwrap();
4383
4384        let mut ext_a = nodes.connect(input_a_port).await;
4385        let mut ext_b = nodes.connect(input_b_port).await;
4386        let mut ext_out = nodes.connect(out).await;
4387
4388        deployment.start().await.unwrap();
4389
4390        // Send a=1, b=3, b=4
4391        ext_a.send(1).await.unwrap();
4392        ext_b.send(3).await.unwrap();
4393        ext_b.send(4).await.unwrap();
4394
4395        // Collect the first 3 elements
4396        let mut received = Vec::new();
4397        for _ in 0..3 {
4398            received.push(ext_out.next().await.unwrap());
4399        }
4400
4401        // Now send the delayed a=2
4402        ext_a.send(2).await.unwrap();
4403        received.push(ext_out.next().await.unwrap());
4404
4405        // All elements should be present
4406        received.sort();
4407        assert_eq!(received, vec![1, 2, 3, 4]);
4408    }
4409
4410    #[cfg(feature = "deploy")]
4411    #[tokio::test]
4412    async fn monotone_fold_threshold() {
4413        use crate::properties::manual_proof;
4414
4415        let mut deployment = Deployment::new();
4416
4417        let mut flow = FlowBuilder::new();
4418        let node = flow.process::<()>();
4419        let external = flow.external::<()>();
4420
4421        let in_unbounded: super::Stream<_, _> =
4422            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4423        let sum = in_unbounded.fold(
4424            q!(|| 0),
4425            q!(
4426                |sum, v| {
4427                    *sum += v;
4428                },
4429                monotone = manual_proof!(/** test */)
4430            ),
4431        );
4432
4433        let threshold_out = sum
4434            .threshold_greater_or_equal(node.singleton(q!(7)))
4435            .send_bincode_external(&external);
4436
4437        let nodes = flow
4438            .with_process(&node, deployment.Localhost())
4439            .with_external(&external, deployment.Localhost())
4440            .deploy(&mut deployment);
4441
4442        deployment.deploy().await.unwrap();
4443
4444        let mut threshold_out = nodes.connect(threshold_out).await;
4445
4446        deployment.start().await.unwrap();
4447
4448        assert_eq!(threshold_out.next().await.unwrap(), 7);
4449    }
4450
4451    #[cfg(feature = "deploy")]
4452    #[tokio::test]
4453    async fn monotone_count_threshold() {
4454        let mut deployment = Deployment::new();
4455
4456        let mut flow = FlowBuilder::new();
4457        let node = flow.process::<()>();
4458        let external = flow.external::<()>();
4459
4460        let in_unbounded: super::Stream<_, _> =
4461            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4462        let sum = in_unbounded.count();
4463
4464        let threshold_out = sum
4465            .threshold_greater_or_equal(node.singleton(q!(3)))
4466            .send_bincode_external(&external);
4467
4468        let nodes = flow
4469            .with_process(&node, deployment.Localhost())
4470            .with_external(&external, deployment.Localhost())
4471            .deploy(&mut deployment);
4472
4473        deployment.deploy().await.unwrap();
4474
4475        let mut threshold_out = nodes.connect(threshold_out).await;
4476
4477        deployment.start().await.unwrap();
4478
4479        assert_eq!(threshold_out.next().await.unwrap(), 3);
4480    }
4481
4482    #[cfg(feature = "deploy")]
4483    #[tokio::test]
4484    async fn monotone_map_order_preserving_threshold() {
4485        use crate::properties::manual_proof;
4486
4487        let mut deployment = Deployment::new();
4488
4489        let mut flow = FlowBuilder::new();
4490        let node = flow.process::<()>();
4491        let external = flow.external::<()>();
4492
4493        let in_unbounded: super::Stream<_, _> =
4494            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4495        let sum = in_unbounded.fold(
4496            q!(|| 0),
4497            q!(
4498                |sum, v| {
4499                    *sum += v;
4500                },
4501                monotone = manual_proof!(/** test */)
4502            ),
4503        );
4504
4505        // map with order_preserving should preserve monotonicity
4506        let doubled = sum.map(q!(
4507            |v| v * 2,
4508            order_preserving = manual_proof!(/** doubling preserves order */)
4509        ));
4510
4511        let threshold_out = doubled
4512            .threshold_greater_or_equal(node.singleton(q!(14)))
4513            .send_bincode_external(&external);
4514
4515        let nodes = flow
4516            .with_process(&node, deployment.Localhost())
4517            .with_external(&external, deployment.Localhost())
4518            .deploy(&mut deployment);
4519
4520        deployment.deploy().await.unwrap();
4521
4522        let mut threshold_out = nodes.connect(threshold_out).await;
4523
4524        deployment.start().await.unwrap();
4525
4526        assert_eq!(threshold_out.next().await.unwrap(), 14);
4527    }
4528
4529    // === Compile-time type tests for join/cross_product ordering ===
4530
4531    #[cfg(any(feature = "deploy", feature = "sim"))]
4532    mod join_ordering_type_tests {
4533        use crate::live_collections::boundedness::{Bounded, Unbounded};
4534        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4535        use crate::location::{Location, Process};
4536
4537        #[expect(dead_code, reason = "compile-time type test")]
4538        fn join_unbounded_with_bounded_preserves_order<'a>(
4539            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4540            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4541        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4542            left.join(right)
4543        }
4544
4545        #[expect(dead_code, reason = "compile-time type test")]
4546        fn join_unbounded_with_unbounded_is_no_order<'a>(
4547            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4548            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4549        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4550            left.join(right)
4551        }
4552
4553        #[expect(dead_code, reason = "compile-time type test")]
4554        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4555            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4556            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4557        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4558            left.join(right)
4559        }
4560
4561        #[expect(dead_code, reason = "compile-time type test")]
4562        fn join_unbounded_noorder_with_bounded<'a>(
4563            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4564            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4565        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4566            left.join(right)
4567        }
4568
4569        // === Compile-time type tests for cross_product ordering ===
4570
4571        #[expect(dead_code, reason = "compile-time type test")]
4572        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4573            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4574            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4575        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4576            left.cross_product(right)
4577        }
4578
4579        #[expect(dead_code, reason = "compile-time type test")]
4580        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4581            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4582            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4583        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4584            left.cross_product(right)
4585        }
4586
4587        #[expect(dead_code, reason = "compile-time type test")]
4588        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4589            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4590            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4591        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4592            left.cross_product(right)
4593        }
4594    } // mod join_ordering_type_tests
4595
4596    // === Runtime correctness tests for bounded join/cross_product ===
4597
4598    #[cfg(feature = "sim")]
4599    #[test]
4600    fn cross_product_mixed_boundedness_correctness() {
4601        use stageleft::q;
4602
4603        use crate::compile::builder::FlowBuilder;
4604        use crate::nondet::nondet;
4605
4606        let mut flow = FlowBuilder::new();
4607        let process = flow.process::<()>();
4608        let tick = process.tick();
4609
4610        let left = process.source_iter(q!(vec![1, 2]));
4611        let right = process
4612            .source_iter(q!(vec!['a', 'b']))
4613            .batch(&tick, nondet!(/** test */))
4614            .all_ticks();
4615
4616        let out = left.cross_product(right).sim_output();
4617
4618        flow.sim().exhaustive(async || {
4619            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4620                .await;
4621        });
4622    }
4623
4624    #[cfg(feature = "sim")]
4625    #[test]
4626    fn join_mixed_boundedness_correctness() {
4627        use stageleft::q;
4628
4629        use crate::compile::builder::FlowBuilder;
4630        use crate::nondet::nondet;
4631
4632        let mut flow = FlowBuilder::new();
4633        let process = flow.process::<()>();
4634        let tick = process.tick();
4635
4636        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4637        let right = process
4638            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4639            .batch(&tick, nondet!(/** test */))
4640            .all_ticks();
4641
4642        let out = left.join(right).sim_output();
4643
4644        flow.sim().exhaustive(async || {
4645            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4646                .await;
4647        });
4648    }
4649
4650    #[cfg(feature = "sim")]
4651    #[test]
4652    fn sim_merge_unordered_independent_atomics() {
4653        let mut flow = FlowBuilder::new();
4654        let node = flow.process::<()>();
4655
4656        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4657        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4658
4659        let out = input1
4660            .atomic()
4661            .merge_unordered(input2.atomic())
4662            .end_atomic()
4663            .sim_output();
4664
4665        flow.sim().exhaustive(async || {
4666            in1_send.send(1);
4667            in2_send.send(2);
4668
4669            out.assert_yields_only_unordered(vec![1, 2]).await;
4670        });
4671    }
4672
4673    #[cfg(feature = "deploy")]
4674    #[tokio::test]
4675    async fn test_stream_ref() {
4676        let mut deployment = Deployment::new();
4677
4678        let mut flow = FlowBuilder::new();
4679        let external = flow.external::<()>();
4680        let p1 = flow.process::<()>();
4681
4682        // Create a bounded stream (source_iter is bounded within a tick)
4683        let my_stream = p1.source_iter(q!(1..=5i32));
4684
4685        let stream_ref = my_stream.by_ref();
4686
4687        // Use the stream ref to get the vec's length
4688        let out_port = p1
4689            .source_iter(q!([()]))
4690            .map(q!(|_| stream_ref.len() as i32))
4691            .send_bincode_external(&external);
4692
4693        // Also consume the stream via pipe
4694        my_stream.for_each(q!(|_| {}));
4695
4696        let nodes = flow
4697            .with_default_optimize()
4698            .with_process(&p1, deployment.Localhost())
4699            .with_external(&external, deployment.Localhost())
4700            .deploy(&mut deployment);
4701
4702        deployment.deploy().await.unwrap();
4703
4704        let mut out_recv = nodes.connect(out_port).await;
4705
4706        deployment.start().await.unwrap();
4707
4708        let result = out_recv.next().await.unwrap();
4709        // stream has 5 elements
4710        assert_eq!(result, 5);
4711    }
4712
4713    #[cfg(feature = "deploy")]
4714    #[tokio::test]
4715    async fn test_stream_ref_contents() {
4716        let mut deployment = Deployment::new();
4717
4718        let mut flow = FlowBuilder::new();
4719        let external = flow.external::<()>();
4720        let p1 = flow.process::<()>();
4721
4722        // Create a bounded stream
4723        let my_stream = p1.source_iter(q!(1..=3i32));
4724
4725        let stream_ref = my_stream.by_ref();
4726
4727        // Sum the referenced vec's contents
4728        let out_port = p1
4729            .source_iter(q!([()]))
4730            .map(q!(|_| stream_ref.iter().sum::<i32>()))
4731            .send_bincode_external(&external);
4732
4733        my_stream.for_each(q!(|_| {}));
4734
4735        let nodes = flow
4736            .with_default_optimize()
4737            .with_process(&p1, deployment.Localhost())
4738            .with_external(&external, deployment.Localhost())
4739            .deploy(&mut deployment);
4740
4741        deployment.deploy().await.unwrap();
4742
4743        let mut out_recv = nodes.connect(out_port).await;
4744
4745        deployment.start().await.unwrap();
4746
4747        let result = out_recv.next().await.unwrap();
4748        // sum of 1+2+3 = 6
4749        assert_eq!(result, 6);
4750    }
4751
4752    #[cfg(feature = "deploy")]
4753    #[tokio::test]
4754    async fn test_stream_ref_no_consumer() {
4755        let mut deployment = Deployment::new();
4756
4757        let mut flow = FlowBuilder::new();
4758        let external = flow.external::<()>();
4759        let p1 = flow.process::<()>();
4760
4761        // Create a bounded stream — no pipe consumer, only ref
4762        let my_stream = p1.source_iter(q!(1..=4i32));
4763
4764        let stream_ref = my_stream.by_ref();
4765
4766        let out_port = p1
4767            .source_iter(q!([()]))
4768            .map(q!(|_| stream_ref.len() as i32))
4769            .send_bincode_external(&external);
4770
4771        let nodes = flow
4772            .with_default_optimize()
4773            .with_process(&p1, deployment.Localhost())
4774            .with_external(&external, deployment.Localhost())
4775            .deploy(&mut deployment);
4776
4777        deployment.deploy().await.unwrap();
4778
4779        let mut out_recv = nodes.connect(out_port).await;
4780
4781        deployment.start().await.unwrap();
4782
4783        let result = out_recv.next().await.unwrap();
4784        assert_eq!(result, 4);
4785    }
4786
4787    #[cfg(feature = "deploy")]
4788    #[tokio::test]
4789    async fn test_stream_mut() {
4790        let mut deployment = Deployment::new();
4791
4792        let mut flow = FlowBuilder::new();
4793        let external = flow.external::<()>();
4794        let p1 = flow.process::<()>();
4795
4796        // Create a bounded stream
4797        let my_stream = p1.source_iter(q!(1..=5i32));
4798
4799        let stream_mut = my_stream.by_mut();
4800
4801        // Mutably reference the buffer to retain only items > 3
4802        let out_port = p1
4803            .source_iter(q!([()]))
4804            .map(q!(|_| {
4805                stream_mut.retain(|x| *x > 3);
4806                stream_mut.len() as i32
4807            }))
4808            .send_bincode_external(&external);
4809
4810        my_stream.for_each(q!(|_| {}));
4811
4812        let nodes = flow
4813            .with_default_optimize()
4814            .with_process(&p1, deployment.Localhost())
4815            .with_external(&external, deployment.Localhost())
4816            .deploy(&mut deployment);
4817
4818        deployment.deploy().await.unwrap();
4819
4820        let mut out_recv = nodes.connect(out_port).await;
4821
4822        deployment.start().await.unwrap();
4823
4824        let result = out_recv.next().await.unwrap();
4825        // After retain(> 3): [4, 5] => len = 2
4826        assert_eq!(result, 2);
4827    }
4828
4829    /// A map with a mut singleton ref on an unordered input should produce > 1
4830    /// simulation instance because the ordering of elements through the mut closure
4831    /// is non-deterministic.
4832    #[cfg(feature = "sim")]
4833    #[test]
4834    fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4835        use crate::live_collections::sliced::sliced;
4836        use crate::live_collections::stream::ExactlyOnce;
4837        use crate::properties::manual_proof;
4838
4839        let mut flow = FlowBuilder::new();
4840        let node = flow.process::<()>();
4841
4842        let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4843
4844        let out_recv = sliced! {
4845            let batch = use(trigger, nondet!(/** test */));
4846            let counter = batch.location().source_iter(q!(vec![0i32]))
4847                .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4848            let counter_mut = counter.by_mut();
4849            let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4850            items.map(q!(
4851                |x| {
4852                    *counter_mut += x;
4853                    *counter_mut
4854                },
4855                commutative = manual_proof!(/** test */)
4856            ))
4857        }
4858        .sim_output();
4859
4860        let count = flow.sim().exhaustive(async || {
4861            trigger_send.send(1);
4862            let _all: Vec<i32> = out_recv.collect_sorted().await;
4863        });
4864
4865        assert_eq!(
4866            count, 2,
4867            "Expected 2 simulation instances due to mut on unordered input, got {}",
4868            count
4869        );
4870    }
4871
4872    /// A map with a mut singleton ref on a top-level unordered input should produce > 1
4873    /// simulation instance. Currently panics because observe_nondet doesn't support
4874    /// top-level bounded inputs yet.
4875    #[cfg(feature = "sim")]
4876    #[test]
4877    #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4878    fn sim_map_with_mut_on_unordered_top_level() {
4879        use crate::properties::manual_proof;
4880
4881        let mut flow = FlowBuilder::new();
4882        let node = flow.process::<()>();
4883
4884        let counter = node
4885            .source_iter(q!(vec![0i32]))
4886            .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4887        let counter_mut = counter.by_mut();
4888
4889        let out_recv = node
4890            .source_iter(q!(vec![1i32, 2]))
4891            .weaken_ordering::<NoOrder>()
4892            .map(q!(
4893                |x| {
4894                    *counter_mut += x;
4895                    *counter_mut
4896                },
4897                commutative = manual_proof!(/** test */)
4898            ))
4899            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4900            .sim_output();
4901
4902        counter.into_stream().for_each(q!(|_| {}));
4903
4904        let count = flow.sim().exhaustive(async || {
4905            let _all: Vec<i32> = out_recv.collect().await;
4906        });
4907
4908        assert_eq!(
4909            count, 2,
4910            "Expected 2 simulation instances due to mut on unordered input, got {}",
4911            count
4912        );
4913    }
4914}