hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41 type UnderlyingBound: Boundedness;
42 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43 type ValueBound: Boundedness;
44
45 /// The type of the keyed singleton if the value for each key is immutable.
46 type WithBoundedValue: KeyedSingletonBound<
47 UnderlyingBound = Self::UnderlyingBound,
48 ValueBound = Bounded,
49 EraseMonotonic = Self::WithBoundedValue,
50 >;
51
52 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55 /// The type of the keyed singleton if the value for each key is no longer monotonic.
56 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59 fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63 type UnderlyingBound = Unbounded;
64 type ValueBound = Unbounded;
65 type WithBoundedValue = BoundedValue;
66 type KeyedStreamToMonotone = MonotonicValue;
67 type EraseMonotonic = Unbounded;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Unbounded
71 }
72}
73
74impl KeyedSingletonBound for Bounded {
75 type UnderlyingBound = Bounded;
76 type ValueBound = Bounded;
77 type WithBoundedValue = Bounded;
78 type KeyedStreamToMonotone = Bounded;
79 type EraseMonotonic = Bounded;
80
81 fn bound_kind() -> KeyedSingletonBoundKind {
82 KeyedSingletonBoundKind::Bounded
83 }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91 type UnderlyingBound = Unbounded;
92 type ValueBound = Bounded;
93 type WithBoundedValue = BoundedValue;
94 type KeyedStreamToMonotone = BoundedValue;
95 type EraseMonotonic = BoundedValue;
96
97 fn bound_kind() -> KeyedSingletonBoundKind {
98 KeyedSingletonBoundKind::BoundedValue
99 }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107 type UnderlyingBound = Unbounded;
108 type ValueBound = Unbounded;
109 type WithBoundedValue = BoundedValue;
110 type KeyedStreamToMonotone = MonotonicValue;
111 type EraseMonotonic = Unbounded;
112
113 fn bound_kind() -> KeyedSingletonBoundKind {
114 KeyedSingletonBoundKind::MonotonicValue
115 }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131/// - [`Bounded`] (local and finite)
132/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135 pub(crate) location: Loc,
136 pub(crate) ir_node: RefCell<HydroNode>,
137 pub(crate) flow_state: FlowState,
138
139 _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143 fn drop(&mut self) {
144 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147 input: Box::new(ir_node),
148 op_metadata: HydroIrOpMetadata::new(),
149 });
150 }
151 }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155 for KeyedSingleton<K, V, Loc, Bound>
156{
157 fn clone(&self) -> Self {
158 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160 *self.ir_node.borrow_mut() = HydroNode::Tee {
161 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162 metadata: self.location.new_node_metadata(Self::collection_kind()),
163 };
164 }
165
166 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167 KeyedSingleton {
168 location: self.location.clone(),
169 flow_state: self.flow_state.clone(),
170 ir_node: HydroNode::Tee {
171 inner: SharedNode(inner.0.clone()),
172 metadata: metadata.clone(),
173 }
174 .into(),
175 _phantom: PhantomData,
176 }
177 } else {
178 unreachable!()
179 }
180 }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184 for KeyedSingleton<K, V, L, B>
185where
186 L: Location<'a> + NoTick,
187{
188 type Location = L;
189
190 fn create_source(cycle_id: CycleId, location: L) -> Self {
191 KeyedSingleton {
192 flow_state: location.flow_state().clone(),
193 location: location.clone(),
194 ir_node: RefCell::new(HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 }),
198 _phantom: PhantomData,
199 }
200 }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205 L: Location<'a>,
206{
207 type Location = Tick<L>;
208
209 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210 KeyedSingleton::new(
211 location.clone(),
212 HydroNode::CycleSource {
213 cycle_id,
214 metadata: location.new_node_metadata(Self::collection_kind()),
215 },
216 )
217 }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 KeyedSingleton::defer_tick(self)
226 }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230 for KeyedSingleton<K, V, L, B>
231where
232 L: Location<'a> + NoTick,
233{
234 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235 assert_eq!(
236 Location::id(&self.location),
237 expected_location,
238 "locations do not match"
239 );
240 self.location
241 .flow_state()
242 .borrow_mut()
243 .push_root(HydroRoot::CycleSink {
244 cycle_id,
245 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246 op_metadata: HydroIrOpMetadata::new(),
247 });
248 }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253 L: Location<'a>,
254{
255 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256 assert_eq!(
257 Location::id(&self.location),
258 expected_location,
259 "locations do not match"
260 );
261 self.location
262 .flow_state()
263 .borrow_mut()
264 .push_root(HydroRoot::CycleSink {
265 cycle_id,
266 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267 op_metadata: HydroIrOpMetadata::new(),
268 });
269 }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277 let flow_state = location.flow_state().clone();
278 KeyedSingleton {
279 location,
280 flow_state,
281 ir_node: RefCell::new(ir_node),
282 _phantom: PhantomData,
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed singleton is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290}
291
292#[cfg(stageleft_runtime)]
293fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
294 me: KeyedSingleton<K, V, L, Bounded>,
295) -> Singleton<usize, L, Bounded> {
296 me.entries().count()
297}
298
299#[cfg(stageleft_runtime)]
300fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
301 me: KeyedSingleton<K, V, L, Bounded>,
302) -> Singleton<HashMap<K, V>, L, Bounded>
303where
304 K: Eq + Hash,
305{
306 me.entries()
307 .assume_ordering_trusted(nondet!(
308 /// There is only one element associated with each key. The closure technically
309 /// isn't commutative in the case where both passed entries have the same key
310 /// but different values.
311 ///
312 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
313 /// the key is never already present in the map.
314 ))
315 .fold(
316 q!(|| HashMap::new()),
317 q!(|map, (k, v)| {
318 map.insert(k, v);
319 }),
320 )
321}
322
323impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
324 pub(crate) fn collection_kind() -> CollectionKind {
325 CollectionKind::KeyedSingleton {
326 bound: B::bound_kind(),
327 key_type: stageleft::quote_type::<K>().into(),
328 value_type: stageleft::quote_type::<V>().into(),
329 }
330 }
331
332 /// Transforms each value by invoking `f` on each element, with keys staying the same
333 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
334 ///
335 /// If you do not want to modify the stream and instead only want to view
336 /// each item use [`KeyedSingleton::inspect`] instead.
337 ///
338 /// # Example
339 /// ```rust
340 /// # #[cfg(feature = "deploy")] {
341 /// # use hydro_lang::prelude::*;
342 /// # use futures::StreamExt;
343 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344 /// let keyed_singleton = // { 1: 2, 2: 4 }
345 /// # process
346 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
347 /// # .into_keyed()
348 /// # .first();
349 /// keyed_singleton.map(q!(|v| v + 1))
350 /// # .entries()
351 /// # }, |mut stream| async move {
352 /// // { 1: 3, 2: 5 }
353 /// # let mut results = Vec::new();
354 /// # for _ in 0..2 {
355 /// # results.push(stream.next().await.unwrap());
356 /// # }
357 /// # results.sort();
358 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
359 /// # }));
360 /// # }
361 /// ```
362 pub fn map<U, F>(
363 self,
364 f: impl IntoQuotedMut<'a, F, L> + Copy,
365 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
366 where
367 F: Fn(V) -> U + 'a,
368 {
369 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
370 let map_f = q!({
371 let orig = f;
372 move |(k, v)| (k, orig(v))
373 })
374 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
375 .into();
376
377 KeyedSingleton::new(
378 self.location.clone(),
379 HydroNode::Map {
380 f: map_f,
381 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
382 metadata: self.location.new_node_metadata(KeyedSingleton::<
383 K,
384 U,
385 L,
386 B::EraseMonotonic,
387 >::collection_kind()),
388 },
389 )
390 }
391
392 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
393 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
394 ///
395 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
396 /// the new value `U`. The key remains unchanged in the output.
397 ///
398 /// # Example
399 /// ```rust
400 /// # #[cfg(feature = "deploy")] {
401 /// # use hydro_lang::prelude::*;
402 /// # use futures::StreamExt;
403 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
404 /// let keyed_singleton = // { 1: 2, 2: 4 }
405 /// # process
406 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
407 /// # .into_keyed()
408 /// # .first();
409 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
410 /// # .entries()
411 /// # }, |mut stream| async move {
412 /// // { 1: 3, 2: 6 }
413 /// # let mut results = Vec::new();
414 /// # for _ in 0..2 {
415 /// # results.push(stream.next().await.unwrap());
416 /// # }
417 /// # results.sort();
418 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
419 /// # }));
420 /// # }
421 /// ```
422 pub fn map_with_key<U, F>(
423 self,
424 f: impl IntoQuotedMut<'a, F, L> + Copy,
425 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
426 where
427 F: Fn((K, V)) -> U + 'a,
428 K: Clone,
429 {
430 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
431 let map_f = q!({
432 let orig = f;
433 move |(k, v)| {
434 let out = orig((Clone::clone(&k), v));
435 (k, out)
436 }
437 })
438 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
439 .into();
440
441 KeyedSingleton::new(
442 self.location.clone(),
443 HydroNode::Map {
444 f: map_f,
445 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
446 metadata: self.location.new_node_metadata(KeyedSingleton::<
447 K,
448 U,
449 L,
450 B::EraseMonotonic,
451 >::collection_kind()),
452 },
453 )
454 }
455
456 /// Gets the number of keys in the keyed singleton.
457 ///
458 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
459 /// since keys may be added / removed over time. When the set of keys changes, the count will
460 /// be asynchronously updated.
461 ///
462 /// # Example
463 /// ```rust
464 /// # #[cfg(feature = "deploy")] {
465 /// # use hydro_lang::prelude::*;
466 /// # use futures::StreamExt;
467 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
468 /// # let tick = process.tick();
469 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
470 /// # process
471 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
472 /// # .into_keyed()
473 /// # .batch(&tick, nondet!(/** test */))
474 /// # .first();
475 /// keyed_singleton.key_count()
476 /// # .all_ticks()
477 /// # }, |mut stream| async move {
478 /// // 3
479 /// # assert_eq!(stream.next().await.unwrap(), 3);
480 /// # }));
481 /// # }
482 /// ```
483 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
484 if B::ValueBound::BOUNDED {
485 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
486 location: self.location.clone(),
487 flow_state: self.flow_state.clone(),
488 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
489 _phantom: PhantomData,
490 };
491
492 me.entries().count().ignore_monotonic()
493 } else if L::is_top_level()
494 && let Some(tick) = self.location.try_tick()
495 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
496 {
497 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
498 self.location.clone(),
499 self.ir_node.replace(HydroNode::Placeholder),
500 );
501
502 let out =
503 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
504 .latest();
505 Singleton::new(
506 out.location.clone(),
507 out.ir_node.replace(HydroNode::Placeholder),
508 )
509 } else {
510 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
511 }
512 }
513
514 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
515 ///
516 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
517 /// asynchronously as well.
518 ///
519 /// # Example
520 /// ```rust
521 /// # #[cfg(feature = "deploy")] {
522 /// # use hydro_lang::prelude::*;
523 /// # use futures::StreamExt;
524 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
525 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
526 /// # process
527 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
528 /// # .into_keyed()
529 /// # .batch(&process.tick(), nondet!(/** test */))
530 /// # .first();
531 /// keyed_singleton.into_singleton()
532 /// # .all_ticks()
533 /// # }, |mut stream| async move {
534 /// // { 1: "a", 2: "b", 3: "c" }
535 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
536 /// # }));
537 /// # }
538 /// ```
539 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
540 where
541 K: Eq + Hash,
542 {
543 if B::ValueBound::BOUNDED {
544 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
545 location: self.location.clone(),
546 flow_state: self.flow_state.clone(),
547 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
548 _phantom: PhantomData,
549 };
550
551 me.entries()
552 .assume_ordering_trusted(nondet!(
553 /// There is only one element associated with each key. The closure technically
554 /// isn't commutative in the case where both passed entries have the same key
555 /// but different values.
556 ///
557 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
558 /// the key is never already present in the map.
559 ))
560 .fold(
561 q!(|| HashMap::new()),
562 q!(|map, (k, v)| {
563 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
564 map.insert(k, v);
565 }),
566 )
567 } else if L::is_top_level()
568 && let Some(tick) = self.location.try_tick()
569 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
570 {
571 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
572 self.location.clone(),
573 self.ir_node.replace(HydroNode::Placeholder),
574 );
575
576 let out = into_singleton_inside_tick(
577 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
578 )
579 .latest();
580 Singleton::new(
581 out.location.clone(),
582 out.ir_node.replace(HydroNode::Placeholder),
583 )
584 } else {
585 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
586 }
587 }
588
589 /// An operator which allows you to "name" a `HydroNode`.
590 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
591 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
592 {
593 let mut node = self.ir_node.borrow_mut();
594 let metadata = node.metadata_mut();
595 metadata.tag = Some(name.to_owned());
596 }
597 self
598 }
599
600 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
601 /// implies that `B == Bounded`.
602 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
603 where
604 B: IsBounded,
605 {
606 KeyedSingleton::new(
607 self.location.clone(),
608 self.ir_node.replace(HydroNode::Placeholder),
609 )
610 }
611
612 /// Gets the value associated with a specific key from the keyed singleton.
613 /// Returns `None` if the key is `None` or there is no associated value.
614 ///
615 /// # Example
616 /// ```rust
617 /// # #[cfg(feature = "deploy")] {
618 /// # use hydro_lang::prelude::*;
619 /// # use futures::StreamExt;
620 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
621 /// let tick = process.tick();
622 /// let keyed_data = process
623 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
624 /// .into_keyed()
625 /// .batch(&tick, nondet!(/** test */))
626 /// .first();
627 /// let key = tick.singleton(q!(1));
628 /// keyed_data.get(key).all_ticks()
629 /// # }, |mut stream| async move {
630 /// // 2
631 /// # assert_eq!(stream.next().await.unwrap(), 2);
632 /// # }));
633 /// # }
634 /// ```
635 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
636 where
637 B: IsBounded,
638 K: Hash + Eq + Clone,
639 V: Clone,
640 {
641 self.make_bounded()
642 .into_keyed_stream()
643 .get(key)
644 .cast_at_most_one_element()
645 }
646
647 /// Emit a keyed stream containing keys shared between the keyed singleton and the
648 /// keyed stream, where each value in the output keyed stream is a tuple of
649 /// (the keyed singleton's value, the keyed stream's value).
650 ///
651 /// # Example
652 /// ```rust
653 /// # #[cfg(feature = "deploy")] {
654 /// # use hydro_lang::prelude::*;
655 /// # use futures::StreamExt;
656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
657 /// let tick = process.tick();
658 /// let keyed_data = process
659 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
660 /// .into_keyed()
661 /// .batch(&tick, nondet!(/** test */))
662 /// .first();
663 /// let other_data = process
664 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
665 /// .into_keyed()
666 /// .batch(&tick, nondet!(/** test */));
667 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
668 /// # }, |mut stream| async move {
669 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
670 /// # let mut results = vec![];
671 /// # for _ in 0..3 {
672 /// # results.push(stream.next().await.unwrap());
673 /// # }
674 /// # results.sort();
675 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
676 /// # }));
677 /// # }
678 /// ```
679 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
680 self,
681 other: KeyedStream<K, V2, L, B2, O2, R2>,
682 ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
683 where
684 B: IsBounded,
685 K: Eq + Hash + Clone,
686 V: Clone,
687 V2: Clone,
688 {
689 // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
690 // always produces deterministic order per key (nested loop join), this could just use
691 // `join_keyed_stream` without constructing IRs manually
692 KeyedStream::new(
693 self.location.clone(),
694 HydroNode::Join {
695 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
696 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
697 metadata: self
698 .location
699 .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
700 },
701 )
702 }
703
704 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
705 /// where each value in the output keyed singleton is a tuple of
706 /// (self.value, other.value).
707 ///
708 /// # Example
709 /// ```rust
710 /// # #[cfg(feature = "deploy")] {
711 /// # use hydro_lang::prelude::*;
712 /// # use futures::StreamExt;
713 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714 /// # let tick = process.tick();
715 /// let requests = // { 1: 10, 2: 20, 3: 30 }
716 /// # process
717 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
718 /// # .into_keyed()
719 /// # .batch(&tick, nondet!(/** test */))
720 /// # .first();
721 /// let other = // { 1: 100, 2: 200, 4: 400 }
722 /// # process
723 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
724 /// # .into_keyed()
725 /// # .batch(&tick, nondet!(/** test */))
726 /// # .first();
727 /// requests.join_keyed_singleton(other)
728 /// # .entries().all_ticks()
729 /// # }, |mut stream| async move {
730 /// // { 1: (10, 100), 2: (20, 200) }
731 /// # let mut results = vec![];
732 /// # for _ in 0..2 {
733 /// # results.push(stream.next().await.unwrap());
734 /// # }
735 /// # results.sort();
736 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
737 /// # }));
738 /// # }
739 /// ```
740 pub fn join_keyed_singleton<V2: Clone>(
741 self,
742 other: KeyedSingleton<K, V2, L, Bounded>,
743 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
744 where
745 B: IsBounded,
746 K: Eq + Hash + Clone,
747 V: Clone,
748 {
749 let result_stream = self
750 .make_bounded()
751 .entries()
752 .join(other.entries())
753 .into_keyed();
754
755 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
756 result_stream.cast_at_most_one_entry_per_key()
757 }
758
759 /// For each value in `self`, find the matching key in `lookup`.
760 /// The output is a keyed singleton with the key from `self`, and a value
761 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
762 /// If the key is not present in `lookup`, the option will be [`None`].
763 ///
764 /// # Example
765 /// ```rust
766 /// # #[cfg(feature = "deploy")] {
767 /// # use hydro_lang::prelude::*;
768 /// # use futures::StreamExt;
769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770 /// # let tick = process.tick();
771 /// let requests = // { 1: 10, 2: 20 }
772 /// # process
773 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
774 /// # .into_keyed()
775 /// # .batch(&tick, nondet!(/** test */))
776 /// # .first();
777 /// let other_data = // { 10: 100, 11: 110 }
778 /// # process
779 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
780 /// # .into_keyed()
781 /// # .batch(&tick, nondet!(/** test */))
782 /// # .first();
783 /// requests.lookup_keyed_singleton(other_data)
784 /// # .entries().all_ticks()
785 /// # }, |mut stream| async move {
786 /// // { 1: (10, Some(100)), 2: (20, None) }
787 /// # let mut results = vec![];
788 /// # for _ in 0..2 {
789 /// # results.push(stream.next().await.unwrap());
790 /// # }
791 /// # results.sort();
792 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
793 /// # }));
794 /// # }
795 /// ```
796 pub fn lookup_keyed_singleton<V2>(
797 self,
798 lookup: KeyedSingleton<V, V2, L, Bounded>,
799 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
800 where
801 B: IsBounded,
802 K: Eq + Hash + Clone,
803 V: Eq + Hash + Clone,
804 V2: Clone,
805 {
806 let result_stream = self
807 .make_bounded()
808 .into_keyed_stream()
809 .lookup_keyed_stream(lookup.into_keyed_stream());
810
811 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
812 result_stream.cast_at_most_one_entry_per_key()
813 }
814
815 /// For each value in `self`, find the matching key in `lookup`.
816 /// The output is a keyed stream with the key from `self`, and a value
817 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
818 /// If the key is not present in `lookup`, the option will be [`None`].
819 ///
820 /// # Example
821 /// ```rust
822 /// # #[cfg(feature = "deploy")] {
823 /// # use hydro_lang::prelude::*;
824 /// # use futures::StreamExt;
825 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
826 /// # let tick = process.tick();
827 /// let requests = // { 1: 10, 2: 20 }
828 /// # process
829 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
830 /// # .into_keyed()
831 /// # .batch(&tick, nondet!(/** test */))
832 /// # .first();
833 /// let other_data = // { 10: 100, 10: 110 }
834 /// # process
835 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
836 /// # .into_keyed()
837 /// # .batch(&tick, nondet!(/** test */));
838 /// requests.lookup_keyed_stream(other_data)
839 /// # .entries().all_ticks()
840 /// # }, |mut stream| async move {
841 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
842 /// # let mut results = vec![];
843 /// # for _ in 0..3 {
844 /// # results.push(stream.next().await.unwrap());
845 /// # }
846 /// # results.sort();
847 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
848 /// # }));
849 /// # }
850 /// ```
851 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
852 self,
853 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
854 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
855 where
856 B: IsBounded,
857 K: Eq + Hash + Clone,
858 V: Eq + Hash + Clone,
859 V2: Clone,
860 {
861 self.make_bounded()
862 .entries()
863 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
864 .into_keyed()
865 .lookup_keyed_stream(lookup)
866 }
867}
868
869impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
870 KeyedSingleton<K, V, L, B>
871{
872 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
873 ///
874 /// The value for each key must be bounded, otherwise the resulting stream elements would be
875 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
876 /// into the output.
877 ///
878 /// # Example
879 /// ```rust
880 /// # #[cfg(feature = "deploy")] {
881 /// # use hydro_lang::prelude::*;
882 /// # use futures::StreamExt;
883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884 /// let keyed_singleton = // { 1: 2, 2: 4 }
885 /// # process
886 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
887 /// # .into_keyed()
888 /// # .first();
889 /// keyed_singleton.entries()
890 /// # }, |mut stream| async move {
891 /// // (1, 2), (2, 4) in any order
892 /// # let mut results = Vec::new();
893 /// # for _ in 0..2 {
894 /// # results.push(stream.next().await.unwrap());
895 /// # }
896 /// # results.sort();
897 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
898 /// # }));
899 /// # }
900 /// ```
901 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
902 self.into_keyed_stream().entries()
903 }
904
905 /// Flattens the keyed singleton into an unordered stream of just the values.
906 ///
907 /// The value for each key must be bounded, otherwise the resulting stream elements would be
908 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
909 /// into the output.
910 ///
911 /// # Example
912 /// ```rust
913 /// # #[cfg(feature = "deploy")] {
914 /// # use hydro_lang::prelude::*;
915 /// # use futures::StreamExt;
916 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
917 /// let keyed_singleton = // { 1: 2, 2: 4 }
918 /// # process
919 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
920 /// # .into_keyed()
921 /// # .first();
922 /// keyed_singleton.values()
923 /// # }, |mut stream| async move {
924 /// // 2, 4 in any order
925 /// # let mut results = Vec::new();
926 /// # for _ in 0..2 {
927 /// # results.push(stream.next().await.unwrap());
928 /// # }
929 /// # results.sort();
930 /// # assert_eq!(results, vec![2, 4]);
931 /// # }));
932 /// # }
933 /// ```
934 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
935 let map_f = q!(|(_, v)| v)
936 .splice_fn1_ctx::<(K, V), V>(&self.location)
937 .into();
938
939 Stream::new(
940 self.location.clone(),
941 HydroNode::Map {
942 f: map_f,
943 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
944 metadata: self.location.new_node_metadata(Stream::<
945 V,
946 L,
947 B::UnderlyingBound,
948 NoOrder,
949 ExactlyOnce,
950 >::collection_kind()),
951 },
952 )
953 }
954
955 /// Flattens the keyed singleton into an unordered stream of just the keys.
956 ///
957 /// The value for each key must be bounded, otherwise the removal of keys would result in
958 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
959 /// into the output.
960 ///
961 /// # Example
962 /// ```rust
963 /// # #[cfg(feature = "deploy")] {
964 /// # use hydro_lang::prelude::*;
965 /// # use futures::StreamExt;
966 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
967 /// let keyed_singleton = // { 1: 2, 2: 4 }
968 /// # process
969 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
970 /// # .into_keyed()
971 /// # .first();
972 /// keyed_singleton.keys()
973 /// # }, |mut stream| async move {
974 /// // 1, 2 in any order
975 /// # let mut results = Vec::new();
976 /// # for _ in 0..2 {
977 /// # results.push(stream.next().await.unwrap());
978 /// # }
979 /// # results.sort();
980 /// # assert_eq!(results, vec![1, 2]);
981 /// # }));
982 /// # }
983 /// ```
984 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
985 self.entries().map(q!(|(k, _)| k))
986 }
987
988 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
989 /// entries whose keys are not in the provided stream.
990 ///
991 /// # Example
992 /// ```rust
993 /// # #[cfg(feature = "deploy")] {
994 /// # use hydro_lang::prelude::*;
995 /// # use futures::StreamExt;
996 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
997 /// let tick = process.tick();
998 /// let keyed_singleton = // { 1: 2, 2: 4 }
999 /// # process
1000 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1001 /// # .into_keyed()
1002 /// # .first()
1003 /// # .batch(&tick, nondet!(/** test */));
1004 /// let keys_to_remove = process
1005 /// .source_iter(q!(vec![1]))
1006 /// .batch(&tick, nondet!(/** test */));
1007 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1008 /// # .entries().all_ticks()
1009 /// # }, |mut stream| async move {
1010 /// // { 2: 4 }
1011 /// # for w in vec![(2, 4)] {
1012 /// # assert_eq!(stream.next().await.unwrap(), w);
1013 /// # }
1014 /// # }));
1015 /// # }
1016 /// ```
1017 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1018 self,
1019 other: Stream<K, L, Bounded, O2, R2>,
1020 ) -> Self
1021 where
1022 K: Hash + Eq,
1023 {
1024 check_matching_location(&self.location, &other.location);
1025
1026 KeyedSingleton::new(
1027 self.location.clone(),
1028 HydroNode::AntiJoin {
1029 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1030 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1031 metadata: self.location.new_node_metadata(Self::collection_kind()),
1032 },
1033 )
1034 }
1035
1036 /// An operator which allows you to "inspect" each value of a keyed singleton without
1037 /// modifying it. The closure `f` is called on a reference to each value. This is
1038 /// mainly useful for debugging, and should not be used to generate side-effects.
1039 ///
1040 /// # Example
1041 /// ```rust
1042 /// # #[cfg(feature = "deploy")] {
1043 /// # use hydro_lang::prelude::*;
1044 /// # use futures::StreamExt;
1045 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046 /// let keyed_singleton = // { 1: 2, 2: 4 }
1047 /// # process
1048 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1049 /// # .into_keyed()
1050 /// # .first();
1051 /// keyed_singleton
1052 /// .inspect(q!(|v| println!("{}", v)))
1053 /// # .entries()
1054 /// # }, |mut stream| async move {
1055 /// // { 1: 2, 2: 4 }
1056 /// # for w in vec![(1, 2), (2, 4)] {
1057 /// # assert_eq!(stream.next().await.unwrap(), w);
1058 /// # }
1059 /// # }));
1060 /// # }
1061 /// ```
1062 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1063 where
1064 F: Fn(&V) + 'a,
1065 {
1066 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1067 let inspect_f = q!({
1068 let orig = f;
1069 move |t: &(_, _)| orig(&t.1)
1070 })
1071 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1072 .into();
1073
1074 KeyedSingleton::new(
1075 self.location.clone(),
1076 HydroNode::Inspect {
1077 f: inspect_f,
1078 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079 metadata: self.location.new_node_metadata(Self::collection_kind()),
1080 },
1081 )
1082 }
1083
1084 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1085 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1086 /// mainly useful for debugging, and should not be used to generate side-effects.
1087 ///
1088 /// # Example
1089 /// ```rust
1090 /// # #[cfg(feature = "deploy")] {
1091 /// # use hydro_lang::prelude::*;
1092 /// # use futures::StreamExt;
1093 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1094 /// let keyed_singleton = // { 1: 2, 2: 4 }
1095 /// # process
1096 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1097 /// # .into_keyed()
1098 /// # .first();
1099 /// keyed_singleton
1100 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1101 /// # .entries()
1102 /// # }, |mut stream| async move {
1103 /// // { 1: 2, 2: 4 }
1104 /// # for w in vec![(1, 2), (2, 4)] {
1105 /// # assert_eq!(stream.next().await.unwrap(), w);
1106 /// # }
1107 /// # }));
1108 /// # }
1109 /// ```
1110 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1111 where
1112 F: Fn(&(K, V)) + 'a,
1113 {
1114 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1115
1116 KeyedSingleton::new(
1117 self.location.clone(),
1118 HydroNode::Inspect {
1119 f: inspect_f,
1120 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1121 metadata: self.location.new_node_metadata(Self::collection_kind()),
1122 },
1123 )
1124 }
1125
1126 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1127 ///
1128 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1129 /// asynchronously updated if a new key is added that is higher than the previous max key.
1130 ///
1131 /// # Example
1132 /// ```rust
1133 /// # #[cfg(feature = "deploy")] {
1134 /// # use hydro_lang::prelude::*;
1135 /// # use futures::StreamExt;
1136 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1137 /// let tick = process.tick();
1138 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1139 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1140 /// # .into_keyed()
1141 /// # .first();
1142 /// keyed_singleton.get_max_key()
1143 /// # .sample_eager(nondet!(/** test */))
1144 /// # }, |mut stream| async move {
1145 /// // (2, 456)
1146 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1147 /// # }));
1148 /// # }
1149 /// ```
1150 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1151 where
1152 K: Ord,
1153 {
1154 self.entries()
1155 .assume_ordering_trusted(nondet!(
1156 /// There is only one element associated with each key, and the keys are totallly
1157 /// ordered so we will produce a deterministic value. The closure technically
1158 /// isn't commutative in the case where both passed entries have the same key
1159 /// but different values.
1160 ///
1161 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1162 /// the two inputs do not have the same key.
1163 ))
1164 .reduce(q!(
1165 move |curr, new| {
1166 if new.0 > curr.0 {
1167 *curr = new;
1168 }
1169 },
1170 idempotent = manual_proof!(/** repeated elements are ignored */)
1171 ))
1172 }
1173
1174 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1175 /// element, the value.
1176 ///
1177 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1178 ///
1179 /// # Example
1180 /// ```rust
1181 /// # #[cfg(feature = "deploy")] {
1182 /// # use hydro_lang::prelude::*;
1183 /// # use futures::StreamExt;
1184 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185 /// let keyed_singleton = // { 1: 2, 2: 4 }
1186 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1187 /// # .into_keyed()
1188 /// # .first();
1189 /// keyed_singleton
1190 /// .clone()
1191 /// .into_keyed_stream()
1192 /// .merge_unordered(
1193 /// keyed_singleton.into_keyed_stream()
1194 /// )
1195 /// # .entries()
1196 /// # }, |mut stream| async move {
1197 /// /// // { 1: [2, 2], 2: [4, 4] }
1198 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1199 /// # assert_eq!(stream.next().await.unwrap(), w);
1200 /// # }
1201 /// # }));
1202 /// # }
1203 /// ```
1204 pub fn into_keyed_stream(
1205 self,
1206 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1207 KeyedStream::new(
1208 self.location.clone(),
1209 HydroNode::Cast {
1210 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1211 metadata: self.location.new_node_metadata(KeyedStream::<
1212 K,
1213 V,
1214 L,
1215 B::UnderlyingBound,
1216 TotalOrder,
1217 ExactlyOnce,
1218 >::collection_kind()),
1219 },
1220 )
1221 }
1222}
1223
1224impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1225where
1226 L: Location<'a>,
1227{
1228 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1229 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1230 ///
1231 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1232 /// processed before an acknowledgement is emitted.
1233 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1234 let id = self.location.flow_state().borrow_mut().next_clock_id();
1235 let out_location = Atomic {
1236 tick: Tick {
1237 id,
1238 l: self.location.clone(),
1239 },
1240 };
1241 KeyedSingleton::new(
1242 out_location.clone(),
1243 HydroNode::BeginAtomic {
1244 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1245 metadata: out_location
1246 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1247 },
1248 )
1249 }
1250}
1251
1252impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1253where
1254 L: Location<'a> + NoTick,
1255{
1256 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1257 /// See [`KeyedSingleton::atomic`] for more details.
1258 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1259 KeyedSingleton::new(
1260 self.location.tick.l.clone(),
1261 HydroNode::EndAtomic {
1262 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1263 metadata: self
1264 .location
1265 .tick
1266 .l
1267 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1268 },
1269 )
1270 }
1271}
1272
1273impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1274 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1275 /// tick `T` always has the entries of `self` at tick `T - 1`.
1276 ///
1277 /// At tick `0`, the output has no entries, since there is no previous tick.
1278 ///
1279 /// This operator enables stateful iterative processing with ticks, by sending data from one
1280 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1281 ///
1282 /// # Example
1283 /// ```rust
1284 /// # #[cfg(feature = "deploy")] {
1285 /// # use hydro_lang::prelude::*;
1286 /// # use futures::StreamExt;
1287 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1288 /// let tick = process.tick();
1289 /// # // ticks are lazy by default, forces the second tick to run
1290 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1291 /// # let batch_first_tick = process
1292 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1293 /// # .batch(&tick, nondet!(/** test */))
1294 /// # .into_keyed();
1295 /// # let batch_second_tick = process
1296 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1297 /// # .batch(&tick, nondet!(/** test */))
1298 /// # .into_keyed()
1299 /// # .defer_tick(); // appears on the second tick
1300 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1301 /// # batch_first_tick.chain(batch_second_tick).first();
1302 /// input_batch.clone().filter_key_not_in(
1303 /// input_batch.defer_tick().keys() // keys present in the previous tick
1304 /// )
1305 /// # .entries().all_ticks()
1306 /// # }, |mut stream| async move {
1307 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1308 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1309 /// # assert_eq!(stream.next().await.unwrap(), w);
1310 /// # }
1311 /// # }));
1312 /// # }
1313 /// ```
1314 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1315 KeyedSingleton::new(
1316 self.location.clone(),
1317 HydroNode::DeferTick {
1318 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1319 metadata: self
1320 .location
1321 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1322 },
1323 )
1324 }
1325}
1326
1327impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1328where
1329 L: Location<'a>,
1330{
1331 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1332 /// point in time.
1333 ///
1334 /// # Non-Determinism
1335 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1336 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1337 pub fn snapshot(
1338 self,
1339 tick: &Tick<L>,
1340 _nondet: NonDet,
1341 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1342 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1343 KeyedSingleton::new(
1344 tick.clone(),
1345 HydroNode::Batch {
1346 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1347 metadata: tick
1348 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1349 },
1350 )
1351 }
1352}
1353
1354impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1355where
1356 L: Location<'a> + NoTick,
1357{
1358 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1359 /// state of the keyed singleton being atomically processed.
1360 ///
1361 /// # Non-Determinism
1362 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1363 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1364 pub fn snapshot_atomic(
1365 self,
1366 tick: &Tick<L>,
1367 _nondet: NonDet,
1368 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1369 KeyedSingleton::new(
1370 tick.clone(),
1371 HydroNode::Batch {
1372 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1373 metadata: tick
1374 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1375 },
1376 )
1377 }
1378}
1379
1380impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1381where
1382 L: Location<'a>,
1383{
1384 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1385 ///
1386 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1387 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1388 /// is filtered out.
1389 ///
1390 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1391 /// not modify or take ownership of the values. If you need to modify the values while filtering
1392 /// use [`KeyedSingleton::filter_map`] instead.
1393 ///
1394 /// # Example
1395 /// ```rust
1396 /// # #[cfg(feature = "deploy")] {
1397 /// # use hydro_lang::prelude::*;
1398 /// # use futures::StreamExt;
1399 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1400 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1401 /// # process
1402 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1403 /// # .into_keyed()
1404 /// # .first();
1405 /// keyed_singleton.filter(q!(|&v| v > 1))
1406 /// # .entries()
1407 /// # }, |mut stream| async move {
1408 /// // { 1: 2, 2: 4 }
1409 /// # let mut results = Vec::new();
1410 /// # for _ in 0..2 {
1411 /// # results.push(stream.next().await.unwrap());
1412 /// # }
1413 /// # results.sort();
1414 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1415 /// # }));
1416 /// # }
1417 /// ```
1418 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1419 where
1420 F: Fn(&V) -> bool + 'a,
1421 {
1422 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1423 let filter_f = q!({
1424 let orig = f;
1425 move |t: &(_, _)| orig(&t.1)
1426 })
1427 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1428 .into();
1429
1430 KeyedSingleton::new(
1431 self.location.clone(),
1432 HydroNode::Filter {
1433 f: filter_f,
1434 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1435 metadata: self
1436 .location
1437 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1438 },
1439 )
1440 }
1441
1442 /// An operator that both filters and maps values. It yields only the key-value pairs where
1443 /// the supplied closure `f` returns `Some(value)`.
1444 ///
1445 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1446 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1447 /// If it returns `None`, the key-value pair is filtered out.
1448 ///
1449 /// # Example
1450 /// ```rust
1451 /// # #[cfg(feature = "deploy")] {
1452 /// # use hydro_lang::prelude::*;
1453 /// # use futures::StreamExt;
1454 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1455 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1456 /// # process
1457 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1458 /// # .into_keyed()
1459 /// # .first();
1460 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1461 /// # .entries()
1462 /// # }, |mut stream| async move {
1463 /// // { 1: 42, 3: 100 }
1464 /// # let mut results = Vec::new();
1465 /// # for _ in 0..2 {
1466 /// # results.push(stream.next().await.unwrap());
1467 /// # }
1468 /// # results.sort();
1469 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1470 /// # }));
1471 /// # }
1472 /// ```
1473 pub fn filter_map<F, U>(
1474 self,
1475 f: impl IntoQuotedMut<'a, F, L> + Copy,
1476 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1477 where
1478 F: Fn(V) -> Option<U> + 'a,
1479 {
1480 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1481 let filter_map_f = q!({
1482 let orig = f;
1483 move |(k, v)| orig(v).map(|o| (k, o))
1484 })
1485 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1486 .into();
1487
1488 KeyedSingleton::new(
1489 self.location.clone(),
1490 HydroNode::FilterMap {
1491 f: filter_map_f,
1492 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1493 metadata: self.location.new_node_metadata(KeyedSingleton::<
1494 K,
1495 U,
1496 L,
1497 B::EraseMonotonic,
1498 >::collection_kind()),
1499 },
1500 )
1501 }
1502
1503 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1504 /// arrived since the previous batch was released.
1505 ///
1506 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1507 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1508 ///
1509 /// # Non-Determinism
1510 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1511 /// has a non-deterministic set of key-value pairs.
1512 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1513 where
1514 L: NoTick,
1515 {
1516 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1517 KeyedSingleton::new(
1518 tick.clone(),
1519 HydroNode::Batch {
1520 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1521 metadata: tick
1522 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1523 },
1524 )
1525 }
1526}
1527
1528impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1529where
1530 L: Location<'a> + NoTick,
1531{
1532 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1533 /// atomically processed.
1534 ///
1535 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1536 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1537 ///
1538 /// # Non-Determinism
1539 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1540 /// has a non-deterministic set of key-value pairs.
1541 pub fn batch_atomic(
1542 self,
1543 tick: &Tick<L>,
1544 nondet: NonDet,
1545 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1546 let _ = nondet;
1547 KeyedSingleton::new(
1548 tick.clone(),
1549 HydroNode::Batch {
1550 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1551 metadata: tick
1552 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1553 },
1554 )
1555 }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560 #[cfg(feature = "deploy")]
1561 use futures::{SinkExt, StreamExt};
1562 #[cfg(feature = "deploy")]
1563 use hydro_deploy::Deployment;
1564 #[cfg(any(feature = "deploy", feature = "sim"))]
1565 use stageleft::q;
1566
1567 #[cfg(any(feature = "deploy", feature = "sim"))]
1568 use crate::compile::builder::FlowBuilder;
1569 #[cfg(any(feature = "deploy", feature = "sim"))]
1570 use crate::location::Location;
1571 #[cfg(any(feature = "deploy", feature = "sim"))]
1572 use crate::nondet::nondet;
1573
1574 #[cfg(feature = "deploy")]
1575 #[tokio::test]
1576 async fn key_count_bounded_value() {
1577 let mut deployment = Deployment::new();
1578
1579 let mut flow = FlowBuilder::new();
1580 let node = flow.process::<()>();
1581 let external = flow.external::<()>();
1582
1583 let (input_port, input) = node.source_external_bincode(&external);
1584 let out = input
1585 .into_keyed()
1586 .first()
1587 .key_count()
1588 .sample_eager(nondet!(/** test */))
1589 .send_bincode_external(&external);
1590
1591 let nodes = flow
1592 .with_process(&node, deployment.Localhost())
1593 .with_external(&external, deployment.Localhost())
1594 .deploy(&mut deployment);
1595
1596 deployment.deploy().await.unwrap();
1597
1598 let mut external_in = nodes.connect(input_port).await;
1599 let mut external_out = nodes.connect(out).await;
1600
1601 deployment.start().await.unwrap();
1602
1603 assert_eq!(external_out.next().await.unwrap(), 0);
1604
1605 external_in.send((1, 1)).await.unwrap();
1606 assert_eq!(external_out.next().await.unwrap(), 1);
1607
1608 external_in.send((2, 2)).await.unwrap();
1609 assert_eq!(external_out.next().await.unwrap(), 2);
1610 }
1611
1612 #[cfg(feature = "deploy")]
1613 #[tokio::test]
1614 async fn key_count_unbounded_value() {
1615 let mut deployment = Deployment::new();
1616
1617 let mut flow = FlowBuilder::new();
1618 let node = flow.process::<()>();
1619 let external = flow.external::<()>();
1620
1621 let (input_port, input) = node.source_external_bincode(&external);
1622 let out = input
1623 .into_keyed()
1624 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1625 .key_count()
1626 .sample_eager(nondet!(/** test */))
1627 .send_bincode_external(&external);
1628
1629 let nodes = flow
1630 .with_process(&node, deployment.Localhost())
1631 .with_external(&external, deployment.Localhost())
1632 .deploy(&mut deployment);
1633
1634 deployment.deploy().await.unwrap();
1635
1636 let mut external_in = nodes.connect(input_port).await;
1637 let mut external_out = nodes.connect(out).await;
1638
1639 deployment.start().await.unwrap();
1640
1641 assert_eq!(external_out.next().await.unwrap(), 0);
1642
1643 external_in.send((1, 1)).await.unwrap();
1644 assert_eq!(external_out.next().await.unwrap(), 1);
1645
1646 external_in.send((1, 2)).await.unwrap();
1647 assert_eq!(external_out.next().await.unwrap(), 1);
1648
1649 external_in.send((2, 2)).await.unwrap();
1650 assert_eq!(external_out.next().await.unwrap(), 2);
1651
1652 external_in.send((1, 1)).await.unwrap();
1653 assert_eq!(external_out.next().await.unwrap(), 2);
1654
1655 external_in.send((3, 1)).await.unwrap();
1656 assert_eq!(external_out.next().await.unwrap(), 3);
1657 }
1658
1659 #[cfg(feature = "deploy")]
1660 #[tokio::test]
1661 async fn into_singleton_bounded_value() {
1662 let mut deployment = Deployment::new();
1663
1664 let mut flow = FlowBuilder::new();
1665 let node = flow.process::<()>();
1666 let external = flow.external::<()>();
1667
1668 let (input_port, input) = node.source_external_bincode(&external);
1669 let out = input
1670 .into_keyed()
1671 .first()
1672 .into_singleton()
1673 .sample_eager(nondet!(/** test */))
1674 .send_bincode_external(&external);
1675
1676 let nodes = flow
1677 .with_process(&node, deployment.Localhost())
1678 .with_external(&external, deployment.Localhost())
1679 .deploy(&mut deployment);
1680
1681 deployment.deploy().await.unwrap();
1682
1683 let mut external_in = nodes.connect(input_port).await;
1684 let mut external_out = nodes.connect(out).await;
1685
1686 deployment.start().await.unwrap();
1687
1688 assert_eq!(
1689 external_out.next().await.unwrap(),
1690 std::collections::HashMap::new()
1691 );
1692
1693 external_in.send((1, 1)).await.unwrap();
1694 assert_eq!(
1695 external_out.next().await.unwrap(),
1696 vec![(1, 1)].into_iter().collect()
1697 );
1698
1699 external_in.send((2, 2)).await.unwrap();
1700 assert_eq!(
1701 external_out.next().await.unwrap(),
1702 vec![(1, 1), (2, 2)].into_iter().collect()
1703 );
1704 }
1705
1706 #[cfg(feature = "deploy")]
1707 #[tokio::test]
1708 async fn into_singleton_unbounded_value() {
1709 let mut deployment = Deployment::new();
1710
1711 let mut flow = FlowBuilder::new();
1712 let node = flow.process::<()>();
1713 let external = flow.external::<()>();
1714
1715 let (input_port, input) = node.source_external_bincode(&external);
1716 let out = input
1717 .into_keyed()
1718 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1719 .into_singleton()
1720 .sample_eager(nondet!(/** test */))
1721 .send_bincode_external(&external);
1722
1723 let nodes = flow
1724 .with_process(&node, deployment.Localhost())
1725 .with_external(&external, deployment.Localhost())
1726 .deploy(&mut deployment);
1727
1728 deployment.deploy().await.unwrap();
1729
1730 let mut external_in = nodes.connect(input_port).await;
1731 let mut external_out = nodes.connect(out).await;
1732
1733 deployment.start().await.unwrap();
1734
1735 assert_eq!(
1736 external_out.next().await.unwrap(),
1737 std::collections::HashMap::new()
1738 );
1739
1740 external_in.send((1, 1)).await.unwrap();
1741 assert_eq!(
1742 external_out.next().await.unwrap(),
1743 vec![(1, 1)].into_iter().collect()
1744 );
1745
1746 external_in.send((1, 2)).await.unwrap();
1747 assert_eq!(
1748 external_out.next().await.unwrap(),
1749 vec![(1, 2)].into_iter().collect()
1750 );
1751
1752 external_in.send((2, 2)).await.unwrap();
1753 assert_eq!(
1754 external_out.next().await.unwrap(),
1755 vec![(1, 2), (2, 1)].into_iter().collect()
1756 );
1757
1758 external_in.send((1, 1)).await.unwrap();
1759 assert_eq!(
1760 external_out.next().await.unwrap(),
1761 vec![(1, 3), (2, 1)].into_iter().collect()
1762 );
1763
1764 external_in.send((3, 1)).await.unwrap();
1765 assert_eq!(
1766 external_out.next().await.unwrap(),
1767 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1768 );
1769 }
1770
1771 #[cfg(feature = "sim")]
1772 #[test]
1773 fn sim_unbounded_singleton_snapshot() {
1774 let mut flow = FlowBuilder::new();
1775 let node = flow.process::<()>();
1776
1777 let (input_port, input) = node.sim_input();
1778 let output = input
1779 .into_keyed()
1780 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1781 .snapshot(&node.tick(), nondet!(/** test */))
1782 .entries()
1783 .all_ticks()
1784 .sim_output();
1785
1786 let count = flow.sim().exhaustive(async || {
1787 input_port.send((1, 123));
1788 input_port.send((1, 456));
1789 input_port.send((2, 123));
1790
1791 let all = output.collect_sorted::<Vec<_>>().await;
1792 assert_eq!(all.last().unwrap(), &(2, 1));
1793 });
1794
1795 assert_eq!(count, 8);
1796 }
1797
1798 #[cfg(feature = "deploy")]
1799 #[tokio::test]
1800 async fn join_keyed_stream() {
1801 let mut deployment = Deployment::new();
1802
1803 let mut flow = FlowBuilder::new();
1804 let node = flow.process::<()>();
1805 let external = flow.external::<()>();
1806
1807 let tick = node.tick();
1808 let keyed_data = node
1809 .source_iter(q!(vec![(1, 10), (2, 20)]))
1810 .into_keyed()
1811 .batch(&tick, nondet!(/** test */))
1812 .first();
1813 let requests = node
1814 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1815 .into_keyed()
1816 .batch(&tick, nondet!(/** test */));
1817
1818 let out = keyed_data
1819 .join_keyed_stream(requests)
1820 .entries()
1821 .all_ticks()
1822 .send_bincode_external(&external);
1823
1824 let nodes = flow
1825 .with_process(&node, deployment.Localhost())
1826 .with_external(&external, deployment.Localhost())
1827 .deploy(&mut deployment);
1828
1829 deployment.deploy().await.unwrap();
1830
1831 let mut external_out = nodes.connect(out).await;
1832
1833 deployment.start().await.unwrap();
1834
1835 let mut results = vec![];
1836 for _ in 0..2 {
1837 results.push(external_out.next().await.unwrap());
1838 }
1839 results.sort();
1840
1841 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1842 }
1843}