hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use sealed::sealed;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_stream::KeyedStream;
15use super::optional::Optional;
16use super::singleton::Singleton;
17use super::sliced::sliced;
18use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::stream::{Ordering, Retries};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::manual_proof;
34
35/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
36///
37/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
38/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
39/// indicates that entries may be added over time, but once an entry is added it will never be
40/// removed and its value will never change.
41pub trait KeyedSingletonBound {
42 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
43 type UnderlyingBound: Boundedness;
44 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
45 type ValueBound: Boundedness;
46
47 /// The type of the keyed singleton if the value for each key is immutable.
48 type WithBoundedValue: KeyedSingletonBound<
49 UnderlyingBound = Self::UnderlyingBound,
50 ValueBound = Bounded,
51 EraseMonotonic = Self::WithBoundedValue,
52 >;
53
54 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
55 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
56
57 /// The [`Boundedness`] of the keyed singleton produced by folding a [`KeyedStream`] with
58 /// [`Self`] boundedness when the aggregation does *not* have a monotonicity proof.
59 ///
60 /// Without a monotonicity proof, the per-key values may change arbitrarily, so an unbounded
61 /// input collapses to [`MonotonicKeys`] (keys are still only added, never removed).
62 type KeyedStreamToNonMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
63
64 /// The type of the keyed singleton if the value for each key is no longer monotonic.
65 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
66
67 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
68 fn bound_kind() -> KeyedSingletonBoundKind;
69}
70
71impl KeyedSingletonBound for Unbounded {
72 type UnderlyingBound = Unbounded;
73 type ValueBound = Unbounded;
74 type WithBoundedValue = BoundedValue;
75 type KeyedStreamToMonotone = MonotonicValue;
76 type KeyedStreamToNonMonotone = MonotonicKeys;
77 type EraseMonotonic = Unbounded;
78
79 fn bound_kind() -> KeyedSingletonBoundKind {
80 KeyedSingletonBoundKind::Unbounded
81 }
82}
83
84impl KeyedSingletonBound for Bounded {
85 type UnderlyingBound = Bounded;
86 type ValueBound = Bounded;
87 type WithBoundedValue = Bounded;
88 type KeyedStreamToMonotone = Bounded;
89 type KeyedStreamToNonMonotone = Bounded;
90 type EraseMonotonic = Bounded;
91
92 fn bound_kind() -> KeyedSingletonBoundKind {
93 KeyedSingletonBoundKind::Bounded
94 }
95}
96
97/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
98/// its value is bounded and will never change, but new entries may appear asynchronously
99pub struct BoundedValue;
100
101impl KeyedSingletonBound for BoundedValue {
102 type UnderlyingBound = Unbounded;
103 type ValueBound = Bounded;
104 type WithBoundedValue = BoundedValue;
105 type KeyedStreamToMonotone = BoundedValue;
106 type KeyedStreamToNonMonotone = BoundedValue;
107 type EraseMonotonic = BoundedValue;
108
109 fn bound_kind() -> KeyedSingletonBoundKind {
110 KeyedSingletonBoundKind::BoundedValue
111 }
112}
113
114/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
115/// it will never be removed, and the corresponding value will only increase monotonically.
116pub struct MonotonicValue;
117
118impl KeyedSingletonBound for MonotonicValue {
119 type UnderlyingBound = Unbounded;
120 type ValueBound = Unbounded;
121 type WithBoundedValue = BoundedValue;
122 type KeyedStreamToMonotone = MonotonicValue;
123 type KeyedStreamToNonMonotone = MonotonicKeys;
124 type EraseMonotonic = MonotonicKeys;
125
126 fn bound_kind() -> KeyedSingletonBoundKind {
127 KeyedSingletonBoundKind::MonotonicValue
128 }
129}
130
131/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key
132/// appears, it will never be removed, but the corresponding value may change arbitrarily.
133pub struct MonotonicKeys;
134
135impl KeyedSingletonBound for MonotonicKeys {
136 type UnderlyingBound = Unbounded;
137 type ValueBound = Unbounded;
138 type WithBoundedValue = BoundedValue;
139 type KeyedStreamToMonotone = MonotonicKeys;
140 type KeyedStreamToNonMonotone = MonotonicKeys;
141 type EraseMonotonic = MonotonicKeys;
142
143 fn bound_kind() -> KeyedSingletonBoundKind {
144 KeyedSingletonBoundKind::MonotonicKeys
145 }
146}
147
148#[sealed]
149#[diagnostic::on_unimplemented(
150 message = "The keyed singleton must have monotonic values (`MonotonicValue`) or be bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
151 label = "required here",
152 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
153)]
154/// Marker trait that is implemented for [`KeyedSingletonBound`] types whose per-key values
155/// are monotonically non-decreasing (or bounded).
156pub trait IsKeyedMonotonic: KeyedSingletonBound {}
157
158#[sealed]
159#[diagnostic::do_not_recommend]
160impl IsKeyedMonotonic for MonotonicValue {}
161
162#[sealed]
163#[diagnostic::do_not_recommend]
164impl IsKeyedMonotonic for BoundedValue {}
165
166#[sealed]
167#[diagnostic::do_not_recommend]
168impl<B: IsBounded + KeyedSingletonBound> IsKeyedMonotonic for B {}
169
170/// Mapping from keys of type `K` to values of type `V`.
171///
172/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
173/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
174/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
175/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
176/// keys cannot be removed and the value for each key is immutable.
177///
178/// Type Parameters:
179/// - `K`: the type of the key for each entry
180/// - `V`: the type of the value for each entry
181/// - `Loc`: the [`Location`] where the keyed singleton is materialized
182/// - `Bound`: tracks whether the entries are:
183/// - [`Bounded`] (local and finite)
184/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
185/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
186pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
187 pub(crate) location: Loc,
188 pub(crate) ir_node: RefCell<HydroNode>,
189 pub(crate) flow_state: FlowState,
190
191 _phantom: PhantomData<(K, V, Loc, Bound)>,
192}
193
194impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
195 fn drop(&mut self) {
196 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
197 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
198 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
199 input: Box::new(ir_node),
200 op_metadata: HydroIrOpMetadata::new(),
201 });
202 }
203 }
204}
205
206impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
207 for KeyedSingleton<K, V, Loc, Bound>
208{
209 fn clone(&self) -> Self {
210 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
211 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 *self.ir_node.borrow_mut() = HydroNode::Tee {
213 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
214 metadata: self.location.new_node_metadata(Self::collection_kind()),
215 };
216 }
217
218 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
219 KeyedSingleton {
220 location: self.location.clone(),
221 flow_state: self.flow_state.clone(),
222 ir_node: HydroNode::Tee {
223 inner: SharedNode(inner.0.clone()),
224 metadata: metadata.clone(),
225 }
226 .into(),
227 _phantom: PhantomData,
228 }
229 } else {
230 unreachable!()
231 }
232 }
233}
234
235impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
236 for KeyedSingleton<K, V, L, B>
237where
238 L: Location<'a>,
239{
240 type Location = L;
241
242 fn create_source(cycle_id: CycleId, location: L) -> Self {
243 KeyedSingleton {
244 flow_state: location.flow_state().clone(),
245 location: location.clone(),
246 ir_node: RefCell::new(HydroNode::CycleSource {
247 cycle_id,
248 metadata: location.new_node_metadata(Self::collection_kind()),
249 }),
250 _phantom: PhantomData,
251 }
252 }
253}
254
255impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
256where
257 L: Location<'a>,
258{
259 type Location = Tick<L>;
260
261 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262 KeyedSingleton::new(
263 location.clone(),
264 HydroNode::CycleSource {
265 cycle_id,
266 metadata: location.new_node_metadata(Self::collection_kind()),
267 },
268 )
269 }
270}
271
272impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
273where
274 L: Location<'a>,
275{
276 fn defer_tick(self) -> Self {
277 KeyedSingleton::defer_tick(self)
278 }
279}
280
281impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
282 for KeyedSingleton<K, V, L, B>
283where
284 L: Location<'a>,
285{
286 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
287 assert_eq!(
288 Location::id(&self.location),
289 expected_location,
290 "locations do not match"
291 );
292 self.location
293 .flow_state()
294 .borrow_mut()
295 .push_root(HydroRoot::CycleSink {
296 cycle_id,
297 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
298 op_metadata: HydroIrOpMetadata::new(),
299 });
300 }
301}
302
303impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
304where
305 L: Location<'a>,
306{
307 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
308 assert_eq!(
309 Location::id(&self.location),
310 expected_location,
311 "locations do not match"
312 );
313 self.location
314 .flow_state()
315 .borrow_mut()
316 .push_root(HydroRoot::CycleSink {
317 cycle_id,
318 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
319 op_metadata: HydroIrOpMetadata::new(),
320 });
321 }
322}
323
324impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
325 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
326 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
327 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
328
329 let flow_state = location.flow_state().clone();
330 KeyedSingleton {
331 location,
332 flow_state,
333 ir_node: RefCell::new(ir_node),
334 _phantom: PhantomData,
335 }
336 }
337
338 /// Returns the [`Location`] where this keyed singleton is being materialized.
339 pub fn location(&self) -> &L {
340 &self.location
341 }
342
343 /// Weakens the consistency of this live collection to not guarantee any consistency across
344 /// cluster members (if this collection is on a cluster).
345 pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
346 where
347 L: Location<'a>,
348 {
349 if L::consistency()
350 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
351 {
352 // already no consistency
353 KeyedSingleton::new(
354 self.location.drop_consistency(),
355 self.ir_node.replace(HydroNode::Placeholder),
356 )
357 } else {
358 KeyedSingleton::new(
359 self.location.drop_consistency(),
360 HydroNode::Cast {
361 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
362 metadata: self
363 .location
364 .drop_consistency()
365 .new_node_metadata(
366 KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
367 ),
368 },
369 )
370 }
371 }
372
373 /// Casts this live collection to have the consistency guarantees specified in the given
374 /// location type parameter. The developer must ensure that the strengthened consistency
375 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
376 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
377 self,
378 _proof: impl crate::properties::ConsistencyProof,
379 ) -> KeyedSingleton<K, V, L2, B>
380 where
381 L: Location<'a>,
382 {
383 if L::consistency() == L2::consistency() {
384 // already consistent
385 KeyedSingleton::new(
386 self.location.with_consistency_of(),
387 self.ir_node.replace(HydroNode::Placeholder),
388 )
389 } else {
390 KeyedSingleton::new(
391 self.location.with_consistency_of(),
392 HydroNode::AssertIsConsistent {
393 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
394 trusted: false,
395 metadata: self
396 .location
397 .clone()
398 .with_consistency_of::<L2>()
399 .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
400 },
401 )
402 }
403 }
404}
405
406#[cfg(stageleft_runtime)]
407fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
408 me: KeyedSingleton<K, V, L, Bounded>,
409) -> Singleton<usize, L, Bounded> {
410 me.entries().count()
411}
412
413#[cfg(stageleft_runtime)]
414fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
415 me: KeyedSingleton<K, V, L, Bounded>,
416) -> Singleton<HashMap<K, V>, L, Bounded>
417where
418 K: Eq + Hash,
419{
420 me.entries()
421 .assume_ordering_trusted(nondet!(
422 /// There is only one element associated with each key. The closure technically
423 /// isn't commutative in the case where both passed entries have the same key
424 /// but different values.
425 ///
426 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
427 /// the key is never already present in the map.
428 ))
429 .fold(
430 q!(|| HashMap::new()),
431 q!(|map, (k, v)| {
432 map.insert(k, v);
433 }),
434 )
435}
436
437impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
438 pub(crate) fn collection_kind() -> CollectionKind {
439 CollectionKind::KeyedSingleton {
440 bound: B::bound_kind(),
441 key_type: stageleft::quote_type::<K>().into(),
442 value_type: stageleft::quote_type::<V>().into(),
443 }
444 }
445
446 /// Transforms each value by invoking `f` on each element, with keys staying the same
447 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
448 ///
449 /// If you do not want to modify the stream and instead only want to view
450 /// each item use [`KeyedSingleton::inspect`] instead.
451 ///
452 /// # Example
453 /// ```rust
454 /// # #[cfg(feature = "deploy")] {
455 /// # use hydro_lang::prelude::*;
456 /// # use futures::StreamExt;
457 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458 /// let keyed_singleton = // { 1: 2, 2: 4 }
459 /// # process
460 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
461 /// # .into_keyed()
462 /// # .first();
463 /// keyed_singleton.map(q!(|v| v + 1))
464 /// # .entries()
465 /// # }, |mut stream| async move {
466 /// // { 1: 3, 2: 5 }
467 /// # let mut results = Vec::new();
468 /// # for _ in 0..2 {
469 /// # results.push(stream.next().await.unwrap());
470 /// # }
471 /// # results.sort();
472 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
473 /// # }));
474 /// # }
475 /// ```
476 pub fn map<U, F>(
477 self,
478 f: impl IntoQuotedMut<'a, F, L> + Copy,
479 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
480 where
481 F: Fn(V) -> U + 'a,
482 {
483 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
484 let map_f = q!({
485 let orig = f;
486 move |(k, v)| (k, orig(v))
487 })
488 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
489 .into();
490
491 KeyedSingleton::new(
492 self.location.clone(),
493 HydroNode::Map {
494 f: map_f,
495 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
496 metadata: self.location.new_node_metadata(KeyedSingleton::<
497 K,
498 U,
499 L,
500 B::EraseMonotonic,
501 >::collection_kind()),
502 },
503 )
504 }
505
506 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
507 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
508 ///
509 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
510 /// the new value `U`. The key remains unchanged in the output.
511 ///
512 /// # Example
513 /// ```rust
514 /// # #[cfg(feature = "deploy")] {
515 /// # use hydro_lang::prelude::*;
516 /// # use futures::StreamExt;
517 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
518 /// let keyed_singleton = // { 1: 2, 2: 4 }
519 /// # process
520 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
521 /// # .into_keyed()
522 /// # .first();
523 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
524 /// # .entries()
525 /// # }, |mut stream| async move {
526 /// // { 1: 3, 2: 6 }
527 /// # let mut results = Vec::new();
528 /// # for _ in 0..2 {
529 /// # results.push(stream.next().await.unwrap());
530 /// # }
531 /// # results.sort();
532 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
533 /// # }));
534 /// # }
535 /// ```
536 pub fn map_with_key<U, F>(
537 self,
538 f: impl IntoQuotedMut<'a, F, L> + Copy,
539 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
540 where
541 F: Fn((K, V)) -> U + 'a,
542 K: Clone,
543 {
544 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
545 let map_f = q!({
546 let orig = f;
547 move |(k, v)| {
548 let out = orig((Clone::clone(&k), v));
549 (k, out)
550 }
551 })
552 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
553 .into();
554
555 KeyedSingleton::new(
556 self.location.clone(),
557 HydroNode::Map {
558 f: map_f,
559 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
560 metadata: self.location.new_node_metadata(KeyedSingleton::<
561 K,
562 U,
563 L,
564 B::EraseMonotonic,
565 >::collection_kind()),
566 },
567 )
568 }
569
570 /// Gets the number of keys in the keyed singleton.
571 ///
572 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
573 /// since keys may be added / removed over time. When the set of keys changes, the count will
574 /// be asynchronously updated.
575 ///
576 /// # Example
577 /// ```rust
578 /// # #[cfg(feature = "deploy")] {
579 /// # use hydro_lang::prelude::*;
580 /// # use futures::StreamExt;
581 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582 /// # let tick = process.tick();
583 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
584 /// # process
585 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
586 /// # .into_keyed()
587 /// # .batch(&tick, nondet!(/** test */))
588 /// # .first();
589 /// keyed_singleton.key_count()
590 /// # .all_ticks()
591 /// # }, |mut stream| async move {
592 /// // 3
593 /// # assert_eq!(stream.next().await.unwrap(), 3);
594 /// # }));
595 /// # }
596 /// ```
597 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
598 if B::ValueBound::BOUNDED {
599 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
600 location: self.location.clone(),
601 flow_state: self.flow_state.clone(),
602 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
603 _phantom: PhantomData,
604 };
605
606 me.entries().count().ignore_monotonic()
607 } else if L::is_top_level()
608 && let Some(tick) = self.location.try_tick()
609 && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
610 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
611 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
612 {
613 let location = self.location.clone();
614 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
615 let me: KeyedSingleton<K, V, L, MonotonicKeys> =
616 KeyedSingleton::new(location.clone(), ir_node);
617
618 let out =
619 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
620 .latest();
621 Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
622 } else {
623 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
624 }
625 }
626
627 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
628 ///
629 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
630 /// asynchronously as well.
631 ///
632 /// # Example
633 /// ```rust
634 /// # #[cfg(feature = "deploy")] {
635 /// # use hydro_lang::prelude::*;
636 /// # use futures::StreamExt;
637 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
639 /// # process
640 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
641 /// # .into_keyed()
642 /// # .batch(&process.tick(), nondet!(/** test */))
643 /// # .first();
644 /// keyed_singleton.into_singleton()
645 /// # .all_ticks()
646 /// # }, |mut stream| async move {
647 /// // { 1: "a", 2: "b", 3: "c" }
648 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
649 /// # }));
650 /// # }
651 /// ```
652 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
653 where
654 K: Eq + Hash,
655 {
656 if B::ValueBound::BOUNDED {
657 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
658 location: self.location.clone(),
659 flow_state: self.flow_state.clone(),
660 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
661 _phantom: PhantomData,
662 };
663
664 me.entries()
665 .assume_ordering_trusted(nondet!(
666 /// There is only one element associated with each key. The closure technically
667 /// isn't commutative in the case where both passed entries have the same key
668 /// but different values.
669 ///
670 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
671 /// the key is never already present in the map.
672 ))
673 .fold(
674 q!(|| HashMap::new()),
675 q!(|map, (k, v)| {
676 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
677 map.insert(k, v);
678 }),
679 )
680 } else if L::is_top_level()
681 && let Some(tick) = self.location.try_tick()
682 && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
683 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
684 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
685 {
686 let location = self.location.clone();
687 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
688 let me: KeyedSingleton<K, V, L, MonotonicKeys> =
689 KeyedSingleton::new(location.clone(), ir_node);
690
691 let out = into_singleton_inside_tick(
692 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693 )
694 .latest();
695 Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
696 } else {
697 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
698 }
699 }
700
701 /// An operator which allows you to "name" a `HydroNode`.
702 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
703 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
704 {
705 let mut node = self.ir_node.borrow_mut();
706 let metadata = node.metadata_mut();
707 metadata.tag = Some(name.to_owned());
708 }
709 self
710 }
711
712 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
713 /// implies that `B == Bounded`.
714 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
715 where
716 B: IsBounded,
717 {
718 KeyedSingleton::new(
719 self.location.clone(),
720 self.ir_node.replace(HydroNode::Placeholder),
721 )
722 }
723
724 /// Gets the value associated with a specific key from the keyed singleton.
725 /// Returns `None` if the key is `None` or there is no associated value.
726 ///
727 /// # Example
728 /// ```rust
729 /// # #[cfg(feature = "deploy")] {
730 /// # use hydro_lang::prelude::*;
731 /// # use futures::StreamExt;
732 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
733 /// let tick = process.tick();
734 /// let keyed_data = process
735 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
736 /// .into_keyed()
737 /// .batch(&tick, nondet!(/** test */))
738 /// .first();
739 /// let key = tick.singleton(q!(1));
740 /// keyed_data.get(key).all_ticks()
741 /// # }, |mut stream| async move {
742 /// // 2
743 /// # assert_eq!(stream.next().await.unwrap(), 2);
744 /// # }));
745 /// # }
746 /// ```
747 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
748 where
749 B: IsBounded,
750 K: Hash + Eq + Clone,
751 V: Clone,
752 {
753 self.make_bounded()
754 .into_keyed_stream()
755 .get(key)
756 .cast_at_most_one_element()
757 }
758
759 /// Emit a keyed stream containing keys shared between the keyed singleton and the
760 /// keyed stream, where each value in the output keyed stream is a tuple of
761 /// (the keyed singleton's value, the keyed stream's value).
762 ///
763 /// # Example
764 /// ```rust
765 /// # #[cfg(feature = "deploy")] {
766 /// # use hydro_lang::prelude::*;
767 /// # use futures::StreamExt;
768 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769 /// let tick = process.tick();
770 /// let keyed_data = process
771 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
772 /// .into_keyed()
773 /// .batch(&tick, nondet!(/** test */))
774 /// .first();
775 /// let other_data = process
776 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
777 /// .into_keyed()
778 /// .batch(&tick, nondet!(/** test */));
779 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
780 /// # }, |mut stream| async move {
781 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
782 /// # let mut results = vec![];
783 /// # for _ in 0..3 {
784 /// # results.push(stream.next().await.unwrap());
785 /// # }
786 /// # results.sort();
787 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
788 /// # }));
789 /// # }
790 /// ```
791 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
792 self,
793 other: KeyedStream<K, V2, L, B2, O2, R2>,
794 ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
795 where
796 B: IsBounded,
797 K: Eq + Hash + Clone,
798 V: Clone,
799 V2: Clone,
800 {
801 // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
802 // always produces deterministic order per key (nested loop join), this could just use
803 // `join_keyed_stream` without constructing IRs manually
804 KeyedStream::new(
805 self.location.clone(),
806 HydroNode::Join {
807 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
808 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
809 metadata: self
810 .location
811 .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
812 },
813 )
814 }
815
816 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
817 /// where each value in the output keyed singleton is a tuple of
818 /// (self.value, other.value).
819 ///
820 /// # Example
821 /// ```rust
822 /// # #[cfg(feature = "deploy")] {
823 /// # use hydro_lang::prelude::*;
824 /// # use futures::StreamExt;
825 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
826 /// # let tick = process.tick();
827 /// let requests = // { 1: 10, 2: 20, 3: 30 }
828 /// # process
829 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
830 /// # .into_keyed()
831 /// # .batch(&tick, nondet!(/** test */))
832 /// # .first();
833 /// let other = // { 1: 100, 2: 200, 4: 400 }
834 /// # process
835 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
836 /// # .into_keyed()
837 /// # .batch(&tick, nondet!(/** test */))
838 /// # .first();
839 /// requests.join_keyed_singleton(other)
840 /// # .entries().all_ticks()
841 /// # }, |mut stream| async move {
842 /// // { 1: (10, 100), 2: (20, 200) }
843 /// # let mut results = vec![];
844 /// # for _ in 0..2 {
845 /// # results.push(stream.next().await.unwrap());
846 /// # }
847 /// # results.sort();
848 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
849 /// # }));
850 /// # }
851 /// ```
852 pub fn join_keyed_singleton<V2: Clone>(
853 self,
854 other: KeyedSingleton<K, V2, L, Bounded>,
855 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
856 where
857 B: IsBounded,
858 K: Eq + Hash + Clone,
859 V: Clone,
860 {
861 let result_stream = self
862 .make_bounded()
863 .entries()
864 .join(other.entries())
865 .into_keyed();
866
867 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
868 result_stream.cast_at_most_one_entry_per_key()
869 }
870
871 /// For each value in `self`, find the matching key in `lookup`.
872 /// The output is a keyed singleton with the key from `self`, and a value
873 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
874 /// If the key is not present in `lookup`, the option will be [`None`].
875 ///
876 /// # Example
877 /// ```rust
878 /// # #[cfg(feature = "deploy")] {
879 /// # use hydro_lang::prelude::*;
880 /// # use futures::StreamExt;
881 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
882 /// # let tick = process.tick();
883 /// let requests = // { 1: 10, 2: 20 }
884 /// # process
885 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
886 /// # .into_keyed()
887 /// # .batch(&tick, nondet!(/** test */))
888 /// # .first();
889 /// let other_data = // { 10: 100, 11: 110 }
890 /// # process
891 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
892 /// # .into_keyed()
893 /// # .batch(&tick, nondet!(/** test */))
894 /// # .first();
895 /// requests.lookup_keyed_singleton(other_data)
896 /// # .entries().all_ticks()
897 /// # }, |mut stream| async move {
898 /// // { 1: (10, Some(100)), 2: (20, None) }
899 /// # let mut results = vec![];
900 /// # for _ in 0..2 {
901 /// # results.push(stream.next().await.unwrap());
902 /// # }
903 /// # results.sort();
904 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
905 /// # }));
906 /// # }
907 /// ```
908 pub fn lookup_keyed_singleton<V2>(
909 self,
910 lookup: KeyedSingleton<V, V2, L, Bounded>,
911 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
912 where
913 B: IsBounded,
914 K: Eq + Hash + Clone,
915 V: Eq + Hash + Clone,
916 V2: Clone,
917 {
918 let result_stream = self
919 .make_bounded()
920 .into_keyed_stream()
921 .lookup_keyed_stream(lookup.into_keyed_stream());
922
923 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
924 result_stream.cast_at_most_one_entry_per_key()
925 }
926
927 /// For each value in `self`, find the matching key in `lookup`.
928 /// The output is a keyed stream with the key from `self`, and a value
929 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
930 /// If the key is not present in `lookup`, the option will be [`None`].
931 ///
932 /// # Example
933 /// ```rust
934 /// # #[cfg(feature = "deploy")] {
935 /// # use hydro_lang::prelude::*;
936 /// # use futures::StreamExt;
937 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938 /// # let tick = process.tick();
939 /// let requests = // { 1: 10, 2: 20 }
940 /// # process
941 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
942 /// # .into_keyed()
943 /// # .batch(&tick, nondet!(/** test */))
944 /// # .first();
945 /// let other_data = // { 10: 100, 10: 110 }
946 /// # process
947 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
948 /// # .into_keyed()
949 /// # .batch(&tick, nondet!(/** test */));
950 /// requests.lookup_keyed_stream(other_data)
951 /// # .entries().all_ticks()
952 /// # }, |mut stream| async move {
953 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
954 /// # let mut results = vec![];
955 /// # for _ in 0..3 {
956 /// # results.push(stream.next().await.unwrap());
957 /// # }
958 /// # results.sort();
959 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
960 /// # }));
961 /// # }
962 /// ```
963 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
964 self,
965 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
966 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
967 where
968 B: IsBounded,
969 K: Eq + Hash + Clone,
970 V: Eq + Hash + Clone,
971 V2: Clone,
972 {
973 self.make_bounded()
974 .entries()
975 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
976 .into_keyed()
977 .lookup_keyed_stream(lookup)
978 }
979
980 /// For each key present in both `self` and `thresholds`, emits a [`KeyedStream`] event the first
981 /// time that key's value becomes greater than or equal to the corresponding threshold value.
982 /// The emitted value for each key is the threshold value itself.
983 ///
984 /// This requires the keyed singleton to have monotonic values ([`MonotonicValue`] or [`Bounded`]),
985 /// because otherwise the threshold detection would be non-deterministic.
986 ///
987 /// The `thresholds` parameter is a [`BoundedValue`] keyed singleton mapping each key to its
988 /// threshold. Thresholds may arrive asynchronously (new keys appear over time), but once set
989 /// for a key, the threshold value is fixed. Late-arriving thresholds are checked against the
990 /// current snapshot value immediately.
991 ///
992 /// # Example
993 /// ```rust,ignore
994 /// use hydro_lang::prelude::*;
995 ///
996 /// // Given a monotonically increasing keyed singleton (e.g. from fold with monotone proof)
997 /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
998 /// .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 is monotone */)));
999 ///
1000 /// // BoundedValue keyed singleton of thresholds (from .first())
1001 /// let thresholds = threshold_source.into_keyed().first();
1002 ///
1003 /// // Emits (key, threshold_value) the first time each key's value >= threshold
1004 /// let crossed = counts.threshold_greater_or_equal(thresholds);
1005 /// ```
1006 pub fn threshold_greater_or_equal(
1007 self,
1008 thresholds: KeyedSingleton<K, V, L, BoundedValue>,
1009 ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1010 where
1011 K: Clone + Eq + Hash,
1012 V: Clone + PartialOrd,
1013 B: IsKeyedMonotonic,
1014 {
1015 let self_location = self.location.clone();
1016 match B::bound_kind() {
1017 KeyedSingletonBoundKind::Bounded => {
1018 // Bounded case: self is already fixed, just join and filter
1019 let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1020 self.location.clone(),
1021 self.ir_node.replace(HydroNode::Placeholder),
1022 );
1023 let result = me
1024 .entries()
1025 .join(thresholds.entries())
1026 .filter_map(q!(|(k, (val, thresh))| {
1027 if val >= thresh {
1028 Some((k, thresh))
1029 } else {
1030 None
1031 }
1032 }))
1033 .into_keyed();
1034 KeyedStream::new(
1035 result.location.clone(),
1036 result.ir_node.replace(HydroNode::Placeholder),
1037 )
1038 }
1039 KeyedSingletonBoundKind::MonotonicValue => {
1040 let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1041 self.location.clone(),
1042 self.ir_node.replace(HydroNode::Placeholder),
1043 );
1044
1045 let result = sliced! {
1046 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1047 let thresh_snapshot =
1048 use(thresholds, nondet!(/** thresholds are deterministic */));
1049 let mut already_crossed =
1050 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1051
1052 let joined = thresh_snapshot.entries().join(snapshot.entries());
1053 let passed = joined
1054 .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1055 .map(q!(|(k, (thresh, _))| (k, thresh)));
1056
1057 let newly_crossed = passed.anti_join(already_crossed.clone());
1058 already_crossed =
1059 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1060
1061 newly_crossed.into_keyed()
1062 };
1063
1064 KeyedStream::new(
1065 self_location,
1066 result.ir_node.replace(HydroNode::Placeholder),
1067 )
1068 }
1069 KeyedSingletonBoundKind::BoundedValue => {
1070 let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1071 self.location.clone(),
1072 self.ir_node.replace(HydroNode::Placeholder),
1073 );
1074
1075 let result = sliced! {
1076 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1077 let thresh_snapshot =
1078 use(thresholds, nondet!(/** thresholds are deterministic */));
1079 let mut already_crossed =
1080 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1081
1082 let joined = thresh_snapshot.entries().join(snapshot.entries());
1083 let passed = joined
1084 .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1085 .map(q!(|(k, (thresh, _))| (k, thresh)));
1086
1087 let newly_crossed = passed.anti_join(already_crossed.clone());
1088 already_crossed =
1089 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1090
1091 newly_crossed.into_keyed()
1092 };
1093
1094 KeyedStream::new(
1095 self_location,
1096 result.ir_node.replace(HydroNode::Placeholder),
1097 )
1098 }
1099 _ => {
1100 unreachable!(
1101 "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1102 )
1103 }
1104 }
1105 }
1106
1107 /// Like [`Self::threshold_greater_or_equal`], but uses a single [`Singleton`] threshold
1108 /// shared across all keys. Emits a `(K, V)` event for each key the first time that key's
1109 /// value becomes >= the threshold. The emitted value is the threshold itself.
1110 ///
1111 /// Because the threshold is a [`Bounded`] singleton, it is a compile-time constant and
1112 /// does not carry ongoing memory cost.
1113 ///
1114 /// # Example
1115 /// ```rust
1116 /// # #[cfg(feature = "deploy")] {
1117 /// # use hydro_lang::prelude::*;
1118 /// # use futures::StreamExt;
1119 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1120 /// // A keyed singleton of per-key values (in practice often a monotone counter): { 1: 6, 2: 4 }
1121 /// let counts = process
1122 /// .source_iter(q!(vec![(1, 6), (2, 4)]))
1123 /// .into_keyed()
1124 /// .first();
1125 ///
1126 /// // A single threshold value shared across all keys
1127 /// let threshold = process.singleton(q!(5));
1128 ///
1129 /// // Emits (key, threshold) the first time each key's value >= threshold
1130 /// counts.threshold_greater_or_equal_uniform(threshold)
1131 /// # .entries()
1132 /// # }, |mut stream| async move {
1133 /// // { 1: 5 } -- key 1's value 6 >= 5, but key 2's value 4 < 5
1134 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1135 /// # }));
1136 /// # }
1137 /// ```
1138 pub fn threshold_greater_or_equal_uniform(
1139 self,
1140 threshold: Singleton<V, L, Bounded>,
1141 ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1142 where
1143 K: Clone + Eq + Hash,
1144 V: Clone + PartialOrd,
1145 B: IsKeyedMonotonic,
1146 {
1147 let self_location = self.location.clone();
1148 match B::bound_kind() {
1149 KeyedSingletonBoundKind::Bounded => {
1150 let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1151 self.location.clone(),
1152 self.ir_node.replace(HydroNode::Placeholder),
1153 );
1154 let result = me
1155 .entries()
1156 .cross_singleton(threshold)
1157 .filter_map(q!(|((k, val), thresh)| {
1158 if val >= thresh {
1159 Some((k, thresh))
1160 } else {
1161 None
1162 }
1163 }))
1164 .into_keyed();
1165 KeyedStream::new(
1166 result.location.clone(),
1167 result.ir_node.replace(HydroNode::Placeholder),
1168 )
1169 }
1170 KeyedSingletonBoundKind::MonotonicValue => {
1171 let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1172 self.location.clone(),
1173 self.ir_node.replace(HydroNode::Placeholder),
1174 );
1175
1176 let result = sliced! {
1177 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1178 let mut already_crossed =
1179 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1180
1181 let tick = snapshot.location().clone();
1182 let thresh_in_tick = threshold.clone_into_tick(&tick);
1183
1184 let crossing = snapshot
1185 .entries()
1186 .cross_singleton(thresh_in_tick)
1187 .filter_map(q!(|((k, val), thresh)| {
1188 if val >= thresh {
1189 Some((k, thresh))
1190 } else {
1191 None
1192 }
1193 }));
1194
1195 let newly_crossed = crossing.anti_join(already_crossed.clone());
1196 already_crossed =
1197 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1198
1199 newly_crossed.into_keyed()
1200 };
1201
1202 KeyedStream::new(
1203 self_location,
1204 result.ir_node.replace(HydroNode::Placeholder),
1205 )
1206 }
1207 KeyedSingletonBoundKind::BoundedValue => {
1208 let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1209 self.location.clone(),
1210 self.ir_node.replace(HydroNode::Placeholder),
1211 );
1212
1213 let result = sliced! {
1214 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1215 let mut already_crossed =
1216 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1217
1218 let tick = snapshot.location().clone();
1219 let thresh_in_tick = threshold.clone_into_tick(&tick);
1220
1221 let crossing = snapshot
1222 .entries()
1223 .cross_singleton(thresh_in_tick)
1224 .filter_map(q!(|((k, val), thresh)| {
1225 if val >= thresh {
1226 Some((k, thresh))
1227 } else {
1228 None
1229 }
1230 }));
1231
1232 let newly_crossed = crossing.anti_join(already_crossed.clone());
1233 already_crossed =
1234 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1235
1236 newly_crossed.into_keyed()
1237 };
1238
1239 KeyedStream::new(
1240 self_location,
1241 result.ir_node.replace(HydroNode::Placeholder),
1242 )
1243 }
1244 _ => {
1245 unreachable!(
1246 "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1247 )
1248 }
1249 }
1250 }
1251}
1252
1253impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
1254 KeyedSingleton<K, V, L, B>
1255{
1256 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
1257 ///
1258 /// The value for each key must be bounded, otherwise the resulting stream elements would be
1259 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1260 /// into the output.
1261 ///
1262 /// # Example
1263 /// ```rust
1264 /// # #[cfg(feature = "deploy")] {
1265 /// # use hydro_lang::prelude::*;
1266 /// # use futures::StreamExt;
1267 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1268 /// let keyed_singleton = // { 1: 2, 2: 4 }
1269 /// # process
1270 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1271 /// # .into_keyed()
1272 /// # .first();
1273 /// keyed_singleton.entries()
1274 /// # }, |mut stream| async move {
1275 /// // (1, 2), (2, 4) in any order
1276 /// # let mut results = Vec::new();
1277 /// # for _ in 0..2 {
1278 /// # results.push(stream.next().await.unwrap());
1279 /// # }
1280 /// # results.sort();
1281 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1282 /// # }));
1283 /// # }
1284 /// ```
1285 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1286 self.into_keyed_stream().entries()
1287 }
1288
1289 /// Flattens the keyed singleton into an unordered stream of just the values.
1290 ///
1291 /// The value for each key must be bounded, otherwise the resulting stream elements would be
1292 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1293 /// into the output.
1294 ///
1295 /// # Example
1296 /// ```rust
1297 /// # #[cfg(feature = "deploy")] {
1298 /// # use hydro_lang::prelude::*;
1299 /// # use futures::StreamExt;
1300 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1301 /// let keyed_singleton = // { 1: 2, 2: 4 }
1302 /// # process
1303 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1304 /// # .into_keyed()
1305 /// # .first();
1306 /// keyed_singleton.values()
1307 /// # }, |mut stream| async move {
1308 /// // 2, 4 in any order
1309 /// # let mut results = Vec::new();
1310 /// # for _ in 0..2 {
1311 /// # results.push(stream.next().await.unwrap());
1312 /// # }
1313 /// # results.sort();
1314 /// # assert_eq!(results, vec![2, 4]);
1315 /// # }));
1316 /// # }
1317 /// ```
1318 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1319 let map_f = q!(|(_, v)| v)
1320 .splice_fn1_ctx::<(K, V), V>(&self.location)
1321 .into();
1322
1323 Stream::new(
1324 self.location.clone(),
1325 HydroNode::Map {
1326 f: map_f,
1327 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1328 metadata: self.location.new_node_metadata(Stream::<
1329 V,
1330 L,
1331 B::UnderlyingBound,
1332 NoOrder,
1333 ExactlyOnce,
1334 >::collection_kind()),
1335 },
1336 )
1337 }
1338
1339 /// Flattens the keyed singleton into an unordered stream of just the keys.
1340 ///
1341 /// The value for each key must be bounded, otherwise the removal of keys would result in
1342 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1343 /// into the output.
1344 ///
1345 /// # Example
1346 /// ```rust
1347 /// # #[cfg(feature = "deploy")] {
1348 /// # use hydro_lang::prelude::*;
1349 /// # use futures::StreamExt;
1350 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1351 /// let keyed_singleton = // { 1: 2, 2: 4 }
1352 /// # process
1353 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1354 /// # .into_keyed()
1355 /// # .first();
1356 /// keyed_singleton.keys()
1357 /// # }, |mut stream| async move {
1358 /// // 1, 2 in any order
1359 /// # let mut results = Vec::new();
1360 /// # for _ in 0..2 {
1361 /// # results.push(stream.next().await.unwrap());
1362 /// # }
1363 /// # results.sort();
1364 /// # assert_eq!(results, vec![1, 2]);
1365 /// # }));
1366 /// # }
1367 /// ```
1368 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1369 self.entries().map(q!(|(k, _)| k))
1370 }
1371
1372 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1373 /// entries whose keys are not in the provided stream.
1374 ///
1375 /// # Example
1376 /// ```rust
1377 /// # #[cfg(feature = "deploy")] {
1378 /// # use hydro_lang::prelude::*;
1379 /// # use futures::StreamExt;
1380 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1381 /// let tick = process.tick();
1382 /// let keyed_singleton = // { 1: 2, 2: 4 }
1383 /// # process
1384 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1385 /// # .into_keyed()
1386 /// # .first()
1387 /// # .batch(&tick, nondet!(/** test */));
1388 /// let keys_to_remove = process
1389 /// .source_iter(q!(vec![1]))
1390 /// .batch(&tick, nondet!(/** test */));
1391 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1392 /// # .entries().all_ticks()
1393 /// # }, |mut stream| async move {
1394 /// // { 2: 4 }
1395 /// # for w in vec![(2, 4)] {
1396 /// # assert_eq!(stream.next().await.unwrap(), w);
1397 /// # }
1398 /// # }));
1399 /// # }
1400 /// ```
1401 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1402 self,
1403 other: Stream<K, L, Bounded, O2, R2>,
1404 ) -> Self
1405 where
1406 K: Hash + Eq,
1407 {
1408 check_matching_location(&self.location, &other.location);
1409
1410 KeyedSingleton::new(
1411 self.location.clone(),
1412 HydroNode::AntiJoin {
1413 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1414 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1415 metadata: self.location.new_node_metadata(Self::collection_kind()),
1416 },
1417 )
1418 }
1419
1420 /// An operator which allows you to "inspect" each value of a keyed singleton without
1421 /// modifying it. The closure `f` is called on a reference to each value. This is
1422 /// mainly useful for debugging, and should not be used to generate side-effects.
1423 ///
1424 /// # Example
1425 /// ```rust
1426 /// # #[cfg(feature = "deploy")] {
1427 /// # use hydro_lang::prelude::*;
1428 /// # use futures::StreamExt;
1429 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1430 /// let keyed_singleton = // { 1: 2, 2: 4 }
1431 /// # process
1432 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1433 /// # .into_keyed()
1434 /// # .first();
1435 /// keyed_singleton
1436 /// .inspect(q!(|v| println!("{}", v)))
1437 /// # .entries()
1438 /// # }, |mut stream| async move {
1439 /// // { 1: 2, 2: 4 }
1440 /// # for w in vec![(1, 2), (2, 4)] {
1441 /// # assert_eq!(stream.next().await.unwrap(), w);
1442 /// # }
1443 /// # }));
1444 /// # }
1445 /// ```
1446 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1447 where
1448 F: Fn(&V) + 'a,
1449 {
1450 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1451 let inspect_f = q!({
1452 let orig = f;
1453 move |t: &(_, _)| orig(&t.1)
1454 })
1455 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1456 .into();
1457
1458 KeyedSingleton::new(
1459 self.location.clone(),
1460 HydroNode::Inspect {
1461 f: inspect_f,
1462 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1463 metadata: self.location.new_node_metadata(Self::collection_kind()),
1464 },
1465 )
1466 }
1467
1468 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1469 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1470 /// mainly useful for debugging, and should not be used to generate side-effects.
1471 ///
1472 /// # Example
1473 /// ```rust
1474 /// # #[cfg(feature = "deploy")] {
1475 /// # use hydro_lang::prelude::*;
1476 /// # use futures::StreamExt;
1477 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1478 /// let keyed_singleton = // { 1: 2, 2: 4 }
1479 /// # process
1480 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1481 /// # .into_keyed()
1482 /// # .first();
1483 /// keyed_singleton
1484 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1485 /// # .entries()
1486 /// # }, |mut stream| async move {
1487 /// // { 1: 2, 2: 4 }
1488 /// # for w in vec![(1, 2), (2, 4)] {
1489 /// # assert_eq!(stream.next().await.unwrap(), w);
1490 /// # }
1491 /// # }));
1492 /// # }
1493 /// ```
1494 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1495 where
1496 F: Fn(&(K, V)) + 'a,
1497 {
1498 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1499
1500 KeyedSingleton::new(
1501 self.location.clone(),
1502 HydroNode::Inspect {
1503 f: inspect_f,
1504 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1505 metadata: self.location.new_node_metadata(Self::collection_kind()),
1506 },
1507 )
1508 }
1509
1510 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1511 ///
1512 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1513 /// asynchronously updated if a new key is added that is higher than the previous max key.
1514 ///
1515 /// # Example
1516 /// ```rust
1517 /// # #[cfg(feature = "deploy")] {
1518 /// # use hydro_lang::prelude::*;
1519 /// # use futures::StreamExt;
1520 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1521 /// let tick = process.tick();
1522 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1523 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1524 /// # .into_keyed()
1525 /// # .first();
1526 /// keyed_singleton.get_max_key()
1527 /// # .sample_eager(nondet!(/** test */))
1528 /// # }, |mut stream| async move {
1529 /// // (2, 456)
1530 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1531 /// # }));
1532 /// # }
1533 /// ```
1534 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1535 where
1536 K: Ord,
1537 {
1538 self.entries()
1539 .assume_ordering_trusted(nondet!(
1540 /// There is only one element associated with each key, and the keys are totallly
1541 /// ordered so we will produce a deterministic value. The closure technically
1542 /// isn't commutative in the case where both passed entries have the same key
1543 /// but different values.
1544 ///
1545 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1546 /// the two inputs do not have the same key.
1547 ))
1548 .reduce(q!(
1549 move |curr, new| {
1550 if new.0 > curr.0 {
1551 *curr = new;
1552 }
1553 },
1554 idempotent = manual_proof!(/** repeated elements are ignored */)
1555 ))
1556 }
1557
1558 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1559 /// element, the value.
1560 ///
1561 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1562 ///
1563 /// # Example
1564 /// ```rust
1565 /// # #[cfg(feature = "deploy")] {
1566 /// # use hydro_lang::prelude::*;
1567 /// # use futures::StreamExt;
1568 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1569 /// let keyed_singleton = // { 1: 2, 2: 4 }
1570 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1571 /// # .into_keyed()
1572 /// # .first();
1573 /// keyed_singleton
1574 /// .clone()
1575 /// .into_keyed_stream()
1576 /// .merge_unordered(
1577 /// keyed_singleton.into_keyed_stream()
1578 /// )
1579 /// # .entries()
1580 /// # }, |mut stream| async move {
1581 /// /// // { 1: [2, 2], 2: [4, 4] }
1582 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1583 /// # assert_eq!(stream.next().await.unwrap(), w);
1584 /// # }
1585 /// # }));
1586 /// # }
1587 /// ```
1588 pub fn into_keyed_stream(
1589 self,
1590 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1591 KeyedStream::new(
1592 self.location.clone(),
1593 HydroNode::Cast {
1594 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1595 metadata: self.location.new_node_metadata(KeyedStream::<
1596 K,
1597 V,
1598 L,
1599 B::UnderlyingBound,
1600 TotalOrder,
1601 ExactlyOnce,
1602 >::collection_kind()),
1603 },
1604 )
1605 }
1606}
1607
1608impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1609where
1610 L: Location<'a>,
1611{
1612 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1613 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1614 ///
1615 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1616 /// processed before an acknowledgement is emitted.
1617 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1618 let id = self.location.flow_state().borrow_mut().next_clock_id();
1619 let out_location = Atomic {
1620 tick: Tick {
1621 id,
1622 l: self.location.clone(),
1623 },
1624 };
1625 KeyedSingleton::new(
1626 out_location.clone(),
1627 HydroNode::BeginAtomic {
1628 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1629 metadata: out_location
1630 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1631 },
1632 )
1633 }
1634}
1635
1636impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1637where
1638 L: Location<'a>,
1639{
1640 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1641 /// See [`KeyedSingleton::atomic`] for more details.
1642 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1643 KeyedSingleton::new(
1644 self.location.tick.l.clone(),
1645 HydroNode::EndAtomic {
1646 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1647 metadata: self
1648 .location
1649 .tick
1650 .l
1651 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1652 },
1653 )
1654 }
1655}
1656
1657impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1658 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1659 /// tick `T` always has the entries of `self` at tick `T - 1`.
1660 ///
1661 /// At tick `0`, the output has no entries, since there is no previous tick.
1662 ///
1663 /// This operator enables stateful iterative processing with ticks, by sending data from one
1664 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1665 ///
1666 /// # Example
1667 /// ```rust
1668 /// # #[cfg(feature = "deploy")] {
1669 /// # use hydro_lang::prelude::*;
1670 /// # use futures::StreamExt;
1671 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1672 /// let tick = process.tick();
1673 /// # // ticks are lazy by default, forces the second tick to run
1674 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1675 /// # let batch_first_tick = process
1676 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1677 /// # .batch(&tick, nondet!(/** test */))
1678 /// # .into_keyed();
1679 /// # let batch_second_tick = process
1680 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1681 /// # .batch(&tick, nondet!(/** test */))
1682 /// # .into_keyed()
1683 /// # .defer_tick(); // appears on the second tick
1684 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1685 /// # batch_first_tick.chain(batch_second_tick).first();
1686 /// input_batch.clone().filter_key_not_in(
1687 /// input_batch.defer_tick().keys() // keys present in the previous tick
1688 /// )
1689 /// # .entries().all_ticks()
1690 /// # }, |mut stream| async move {
1691 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1692 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1693 /// # assert_eq!(stream.next().await.unwrap(), w);
1694 /// # }
1695 /// # }));
1696 /// # }
1697 /// ```
1698 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1699 KeyedSingleton::new(
1700 self.location.clone(),
1701 HydroNode::DeferTick {
1702 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1703 metadata: self
1704 .location
1705 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1706 },
1707 )
1708 }
1709}
1710
1711impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1712where
1713 L: Location<'a>,
1714{
1715 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1716 /// point in time.
1717 ///
1718 /// # Non-Determinism
1719 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1720 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1721 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1722 self,
1723 tick: &Tick<L2>,
1724 _nondet: NonDet,
1725 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1726 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1727 KeyedSingleton::new(
1728 tick.drop_consistency(),
1729 HydroNode::Batch {
1730 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1731 metadata: tick
1732 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1733 },
1734 )
1735 }
1736}
1737
1738impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1739where
1740 L: Location<'a>,
1741{
1742 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1743 /// state of the keyed singleton being atomically processed.
1744 ///
1745 /// # Non-Determinism
1746 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1747 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1748 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1749 self,
1750 tick: &Tick<L2>,
1751 _nondet: NonDet,
1752 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1753 KeyedSingleton::new(
1754 tick.drop_consistency(),
1755 HydroNode::Batch {
1756 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1757 metadata: tick
1758 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1759 },
1760 )
1761 }
1762}
1763
1764impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1765where
1766 L: Location<'a>,
1767{
1768 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1769 ///
1770 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1771 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1772 /// is filtered out.
1773 ///
1774 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1775 /// not modify or take ownership of the values. If you need to modify the values while filtering
1776 /// use [`KeyedSingleton::filter_map`] instead.
1777 ///
1778 /// # Example
1779 /// ```rust
1780 /// # #[cfg(feature = "deploy")] {
1781 /// # use hydro_lang::prelude::*;
1782 /// # use futures::StreamExt;
1783 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1784 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1785 /// # process
1786 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1787 /// # .into_keyed()
1788 /// # .first();
1789 /// keyed_singleton.filter(q!(|&v| v > 1))
1790 /// # .entries()
1791 /// # }, |mut stream| async move {
1792 /// // { 1: 2, 2: 4 }
1793 /// # let mut results = Vec::new();
1794 /// # for _ in 0..2 {
1795 /// # results.push(stream.next().await.unwrap());
1796 /// # }
1797 /// # results.sort();
1798 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1799 /// # }));
1800 /// # }
1801 /// ```
1802 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1803 where
1804 F: Fn(&V) -> bool + 'a,
1805 {
1806 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1807 let filter_f = q!({
1808 let orig = f;
1809 move |t: &(_, _)| orig(&t.1)
1810 })
1811 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1812 .into();
1813
1814 KeyedSingleton::new(
1815 self.location.clone(),
1816 HydroNode::Filter {
1817 f: filter_f,
1818 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1819 metadata: self
1820 .location
1821 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1822 },
1823 )
1824 }
1825
1826 /// An operator that both filters and maps values. It yields only the key-value pairs where
1827 /// the supplied closure `f` returns `Some(value)`.
1828 ///
1829 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1830 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1831 /// If it returns `None`, the key-value pair is filtered out.
1832 ///
1833 /// # Example
1834 /// ```rust
1835 /// # #[cfg(feature = "deploy")] {
1836 /// # use hydro_lang::prelude::*;
1837 /// # use futures::StreamExt;
1838 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1839 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1840 /// # process
1841 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1842 /// # .into_keyed()
1843 /// # .first();
1844 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1845 /// # .entries()
1846 /// # }, |mut stream| async move {
1847 /// // { 1: 42, 3: 100 }
1848 /// # let mut results = Vec::new();
1849 /// # for _ in 0..2 {
1850 /// # results.push(stream.next().await.unwrap());
1851 /// # }
1852 /// # results.sort();
1853 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1854 /// # }));
1855 /// # }
1856 /// ```
1857 pub fn filter_map<F, U>(
1858 self,
1859 f: impl IntoQuotedMut<'a, F, L> + Copy,
1860 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1861 where
1862 F: Fn(V) -> Option<U> + 'a,
1863 {
1864 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1865 let filter_map_f = q!({
1866 let orig = f;
1867 move |(k, v)| orig(v).map(|o| (k, o))
1868 })
1869 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1870 .into();
1871
1872 KeyedSingleton::new(
1873 self.location.clone(),
1874 HydroNode::FilterMap {
1875 f: filter_map_f,
1876 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1877 metadata: self.location.new_node_metadata(KeyedSingleton::<
1878 K,
1879 U,
1880 L,
1881 B::EraseMonotonic,
1882 >::collection_kind()),
1883 },
1884 )
1885 }
1886
1887 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1888 /// arrived since the previous batch was released.
1889 ///
1890 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1891 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1892 ///
1893 /// # Non-Determinism
1894 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1895 /// has a non-deterministic set of key-value pairs.
1896 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1897 self,
1898 tick: &Tick<L2>,
1899 _nondet: NonDet,
1900 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1901 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1902 KeyedSingleton::new(
1903 tick.drop_consistency(),
1904 HydroNode::Batch {
1905 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1906 metadata: tick
1907 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1908 },
1909 )
1910 }
1911}
1912
1913impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1914where
1915 L: Location<'a>,
1916{
1917 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1918 /// atomically processed.
1919 ///
1920 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1921 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1922 ///
1923 /// # Non-Determinism
1924 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1925 /// has a non-deterministic set of key-value pairs.
1926 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1927 self,
1928 tick: &Tick<L2>,
1929 nondet: NonDet,
1930 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1931 let _ = nondet;
1932 KeyedSingleton::new(
1933 tick.drop_consistency(),
1934 HydroNode::Batch {
1935 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1936 metadata: tick
1937 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1938 },
1939 )
1940 }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945 #[cfg(feature = "deploy")]
1946 use futures::{SinkExt, StreamExt};
1947 #[cfg(feature = "deploy")]
1948 use hydro_deploy::Deployment;
1949 #[cfg(any(feature = "deploy", feature = "sim"))]
1950 use stageleft::q;
1951
1952 #[cfg(any(feature = "deploy", feature = "sim"))]
1953 use crate::compile::builder::FlowBuilder;
1954 #[cfg(any(feature = "deploy", feature = "sim"))]
1955 use crate::location::Location;
1956 #[cfg(any(feature = "deploy", feature = "sim"))]
1957 use crate::nondet::nondet;
1958
1959 #[cfg(feature = "deploy")]
1960 #[tokio::test]
1961 async fn key_count_bounded_value() {
1962 let mut deployment = Deployment::new();
1963
1964 let mut flow = FlowBuilder::new();
1965 let node = flow.process::<()>();
1966 let external = flow.external::<()>();
1967
1968 let (input_port, input) = node.source_external_bincode(&external);
1969 let out = input
1970 .into_keyed()
1971 .first()
1972 .key_count()
1973 .sample_eager(nondet!(/** test */))
1974 .send_bincode_external(&external);
1975
1976 let nodes = flow
1977 .with_process(&node, deployment.Localhost())
1978 .with_external(&external, deployment.Localhost())
1979 .deploy(&mut deployment);
1980
1981 deployment.deploy().await.unwrap();
1982
1983 let mut external_in = nodes.connect(input_port).await;
1984 let mut external_out = nodes.connect(out).await;
1985
1986 deployment.start().await.unwrap();
1987
1988 assert_eq!(external_out.next().await.unwrap(), 0);
1989
1990 external_in.send((1, 1)).await.unwrap();
1991 assert_eq!(external_out.next().await.unwrap(), 1);
1992
1993 external_in.send((2, 2)).await.unwrap();
1994 assert_eq!(external_out.next().await.unwrap(), 2);
1995 }
1996
1997 #[cfg(feature = "deploy")]
1998 #[tokio::test]
1999 async fn key_count_unbounded_value() {
2000 let mut deployment = Deployment::new();
2001
2002 let mut flow = FlowBuilder::new();
2003 let node = flow.process::<()>();
2004 let external = flow.external::<()>();
2005
2006 let (input_port, input) = node.source_external_bincode(&external);
2007 let out = input
2008 .into_keyed()
2009 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2010 .key_count()
2011 .sample_eager(nondet!(/** test */))
2012 .send_bincode_external(&external);
2013
2014 let nodes = flow
2015 .with_process(&node, deployment.Localhost())
2016 .with_external(&external, deployment.Localhost())
2017 .deploy(&mut deployment);
2018
2019 deployment.deploy().await.unwrap();
2020
2021 let mut external_in = nodes.connect(input_port).await;
2022 let mut external_out = nodes.connect(out).await;
2023
2024 deployment.start().await.unwrap();
2025
2026 assert_eq!(external_out.next().await.unwrap(), 0);
2027
2028 external_in.send((1, 1)).await.unwrap();
2029 assert_eq!(external_out.next().await.unwrap(), 1);
2030
2031 external_in.send((1, 2)).await.unwrap();
2032 assert_eq!(external_out.next().await.unwrap(), 1);
2033
2034 external_in.send((2, 2)).await.unwrap();
2035 assert_eq!(external_out.next().await.unwrap(), 2);
2036
2037 external_in.send((1, 1)).await.unwrap();
2038 assert_eq!(external_out.next().await.unwrap(), 2);
2039
2040 external_in.send((3, 1)).await.unwrap();
2041 assert_eq!(external_out.next().await.unwrap(), 3);
2042 }
2043
2044 #[cfg(feature = "deploy")]
2045 #[tokio::test]
2046 async fn into_singleton_bounded_value() {
2047 let mut deployment = Deployment::new();
2048
2049 let mut flow = FlowBuilder::new();
2050 let node = flow.process::<()>();
2051 let external = flow.external::<()>();
2052
2053 let (input_port, input) = node.source_external_bincode(&external);
2054 let out = input
2055 .into_keyed()
2056 .first()
2057 .into_singleton()
2058 .sample_eager(nondet!(/** test */))
2059 .send_bincode_external(&external);
2060
2061 let nodes = flow
2062 .with_process(&node, deployment.Localhost())
2063 .with_external(&external, deployment.Localhost())
2064 .deploy(&mut deployment);
2065
2066 deployment.deploy().await.unwrap();
2067
2068 let mut external_in = nodes.connect(input_port).await;
2069 let mut external_out = nodes.connect(out).await;
2070
2071 deployment.start().await.unwrap();
2072
2073 assert_eq!(
2074 external_out.next().await.unwrap(),
2075 std::collections::HashMap::new()
2076 );
2077
2078 external_in.send((1, 1)).await.unwrap();
2079 assert_eq!(
2080 external_out.next().await.unwrap(),
2081 vec![(1, 1)].into_iter().collect()
2082 );
2083
2084 external_in.send((2, 2)).await.unwrap();
2085 assert_eq!(
2086 external_out.next().await.unwrap(),
2087 vec![(1, 1), (2, 2)].into_iter().collect()
2088 );
2089 }
2090
2091 #[cfg(feature = "deploy")]
2092 #[tokio::test]
2093 async fn into_singleton_unbounded_value() {
2094 let mut deployment = Deployment::new();
2095
2096 let mut flow = FlowBuilder::new();
2097 let node = flow.process::<()>();
2098 let external = flow.external::<()>();
2099
2100 let (input_port, input) = node.source_external_bincode(&external);
2101 let out = input
2102 .into_keyed()
2103 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2104 .into_singleton()
2105 .sample_eager(nondet!(/** test */))
2106 .send_bincode_external(&external);
2107
2108 let nodes = flow
2109 .with_process(&node, deployment.Localhost())
2110 .with_external(&external, deployment.Localhost())
2111 .deploy(&mut deployment);
2112
2113 deployment.deploy().await.unwrap();
2114
2115 let mut external_in = nodes.connect(input_port).await;
2116 let mut external_out = nodes.connect(out).await;
2117
2118 deployment.start().await.unwrap();
2119
2120 assert_eq!(
2121 external_out.next().await.unwrap(),
2122 std::collections::HashMap::new()
2123 );
2124
2125 external_in.send((1, 1)).await.unwrap();
2126 assert_eq!(
2127 external_out.next().await.unwrap(),
2128 vec![(1, 1)].into_iter().collect()
2129 );
2130
2131 external_in.send((1, 2)).await.unwrap();
2132 assert_eq!(
2133 external_out.next().await.unwrap(),
2134 vec![(1, 2)].into_iter().collect()
2135 );
2136
2137 external_in.send((2, 2)).await.unwrap();
2138 assert_eq!(
2139 external_out.next().await.unwrap(),
2140 vec![(1, 2), (2, 1)].into_iter().collect()
2141 );
2142
2143 external_in.send((1, 1)).await.unwrap();
2144 assert_eq!(
2145 external_out.next().await.unwrap(),
2146 vec![(1, 3), (2, 1)].into_iter().collect()
2147 );
2148
2149 external_in.send((3, 1)).await.unwrap();
2150 assert_eq!(
2151 external_out.next().await.unwrap(),
2152 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
2153 );
2154 }
2155
2156 #[cfg(feature = "sim")]
2157 #[test]
2158 fn sim_unbounded_singleton_snapshot() {
2159 let mut flow = FlowBuilder::new();
2160 let node = flow.process::<()>();
2161
2162 let (input_port, input) = node.sim_input();
2163 let output = input
2164 .into_keyed()
2165 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2166 .snapshot(&node.tick(), nondet!(/** test */))
2167 .entries()
2168 .all_ticks()
2169 .sim_output();
2170
2171 let count = flow.sim().exhaustive(async || {
2172 input_port.send((1, 123));
2173 input_port.send((1, 456));
2174 input_port.send((2, 123));
2175
2176 let all = output.collect_sorted::<Vec<_>>().await;
2177 assert_eq!(all.last().unwrap(), &(2, 1));
2178 });
2179
2180 assert_eq!(count, 8);
2181 }
2182
2183 #[cfg(feature = "deploy")]
2184 #[tokio::test]
2185 async fn join_keyed_stream() {
2186 let mut deployment = Deployment::new();
2187
2188 let mut flow = FlowBuilder::new();
2189 let node = flow.process::<()>();
2190 let external = flow.external::<()>();
2191
2192 let tick = node.tick();
2193 let keyed_data = node
2194 .source_iter(q!(vec![(1, 10), (2, 20)]))
2195 .into_keyed()
2196 .batch(&tick, nondet!(/** test */))
2197 .first();
2198 let requests = node
2199 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
2200 .into_keyed()
2201 .batch(&tick, nondet!(/** test */));
2202
2203 let out = keyed_data
2204 .join_keyed_stream(requests)
2205 .entries()
2206 .all_ticks()
2207 .send_bincode_external(&external);
2208
2209 let nodes = flow
2210 .with_process(&node, deployment.Localhost())
2211 .with_external(&external, deployment.Localhost())
2212 .deploy(&mut deployment);
2213
2214 deployment.deploy().await.unwrap();
2215
2216 let mut external_out = nodes.connect(out).await;
2217
2218 deployment.start().await.unwrap();
2219
2220 let mut results = vec![];
2221 for _ in 0..2 {
2222 results.push(external_out.next().await.unwrap());
2223 }
2224 results.sort();
2225
2226 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
2227 }
2228
2229 #[cfg(feature = "sim")]
2230 #[test]
2231 fn threshold_greater_or_equal_monotonic() {
2232 let mut flow = FlowBuilder::new();
2233 let node = flow.process::<()>();
2234
2235 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2236 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2237
2238 // Create a monotonically increasing keyed singleton via fold with monotone proof
2239 let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2240 input.into_keyed().fold(
2241 q!(|| 0usize),
2242 q!(
2243 |acc, v| *acc += v,
2244 monotone = crate::properties::manual_proof!(/** += is monotonic */)
2245 ),
2246 );
2247
2248 // BoundedValue keyed singleton of thresholds (from .first() on unbounded stream)
2249 let thresholds = thresh_input.into_keyed().first();
2250
2251 let output = counts
2252 .threshold_greater_or_equal(thresholds)
2253 .entries()
2254 .sim_output();
2255
2256 let count = flow.sim().exhaustive(async || {
2257 // Set thresholds: key 1 needs value >= 5, key 2 needs value >= 10
2258 thresh_port.send((1, 5));
2259 thresh_port.send((2, 10));
2260
2261 // key 1 gets increments: 3 + 3 = 6, which is >= 5 ✓
2262 input_port.send((1, 3));
2263 input_port.send((1, 3));
2264 // key 2 gets increments: 3 + 3 = 6, which is < 10 ✗
2265 input_port.send((2, 3));
2266 input_port.send((2, 3));
2267
2268 let results = output.collect_sorted::<Vec<_>>().await;
2269 assert_eq!(results, vec![(1, 5)]);
2270 });
2271
2272 assert!(count > 0);
2273 }
2274
2275 #[cfg(feature = "sim")]
2276 #[test]
2277 fn threshold_greater_or_equal_uniform() {
2278 let mut flow = FlowBuilder::new();
2279 let node = flow.process::<()>();
2280
2281 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2282
2283 let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2284 input.into_keyed().fold(
2285 q!(|| 0usize),
2286 q!(
2287 |acc, v| *acc += v,
2288 monotone = crate::properties::manual_proof!(/** += is monotonic */)
2289 ),
2290 );
2291
2292 // Uniform threshold: all keys need value >= 5
2293 let threshold = node.singleton(q!(5usize));
2294
2295 let output = counts
2296 .threshold_greater_or_equal_uniform(threshold)
2297 .entries()
2298 .sim_output();
2299
2300 let count = flow.sim().exhaustive(async || {
2301 // key 1: 3 + 3 = 6 >= 5 ✓
2302 input_port.send((1, 3));
2303 input_port.send((1, 3));
2304 // key 2: 2 + 2 = 4 < 5 ✗
2305 input_port.send((2, 2));
2306 input_port.send((2, 2));
2307
2308 let results = output.collect_sorted::<Vec<_>>().await;
2309 assert_eq!(results, vec![(1, 5)]);
2310 });
2311
2312 assert!(count > 0);
2313 }
2314
2315 #[cfg(feature = "sim")]
2316 #[test]
2317 fn threshold_greater_or_equal_bounded_value() {
2318 let mut flow = FlowBuilder::new();
2319 let node = flow.process::<()>();
2320
2321 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2322 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2323
2324 // BoundedValue keyed singleton (values fixed once per key via .first())
2325 let values = input.into_keyed().first();
2326
2327 // BoundedValue keyed singleton of thresholds
2328 let thresholds = thresh_input.into_keyed().first();
2329
2330 let output = values
2331 .threshold_greater_or_equal(thresholds)
2332 .entries()
2333 .sim_output();
2334
2335 let count = flow.sim().exhaustive(async || {
2336 // Set thresholds: key 1 needs >= 3, key 2 needs >= 10
2337 thresh_port.send((1, 3));
2338 thresh_port.send((2, 10));
2339
2340 // key 1 gets value 5 >= 3 ✓, key 2 gets value 4 < 10 ✗
2341 input_port.send((1, 5));
2342 input_port.send((2, 4));
2343
2344 let results = output.collect_sorted::<Vec<_>>().await;
2345 assert_eq!(results, vec![(1, 3)]);
2346 });
2347
2348 assert!(count > 0);
2349 }
2350
2351 #[cfg(feature = "sim")]
2352 #[test]
2353 fn threshold_greater_or_equal_uniform_bounded_value() {
2354 let mut flow = FlowBuilder::new();
2355 let node = flow.process::<()>();
2356
2357 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2358
2359 // BoundedValue keyed singleton (values fixed once per key via .first())
2360 let values = input.into_keyed().first();
2361
2362 // Uniform threshold: all keys need value >= 5
2363 let threshold = node.singleton(q!(5usize));
2364
2365 let output = values
2366 .threshold_greater_or_equal_uniform(threshold)
2367 .entries()
2368 .sim_output();
2369
2370 let count = flow.sim().exhaustive(async || {
2371 // key 1 gets value 7 >= 5 ✓, key 2 gets value 3 < 5 ✗
2372 input_port.send((1, 7));
2373 input_port.send((2, 3));
2374
2375 let results = output.collect_sorted::<Vec<_>>().await;
2376 assert_eq!(results, vec![(1, 5)]);
2377 });
2378
2379 assert!(count > 0);
2380 }
2381
2382 #[cfg(feature = "sim")]
2383 #[test]
2384 fn threshold_greater_or_equal_bounded() {
2385 let mut flow = FlowBuilder::new();
2386 let node = flow.process::<()>();
2387
2388 // Bounded keyed singleton (fully known upfront)
2389 let values = node
2390 .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2391 .into_keyed()
2392 .first();
2393
2394 // BoundedValue thresholds (from async source)
2395 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2396 let thresholds = thresh_input.into_keyed().first();
2397
2398 let output = values
2399 .threshold_greater_or_equal(thresholds)
2400 .entries()
2401 .sim_output();
2402
2403 let count = flow.sim().exhaustive(async || {
2404 thresh_port.send((1, 5));
2405 thresh_port.send((2, 10));
2406
2407 // key 1: 6 >= 5 ✓, key 2: 4 < 10 ✗
2408 let results = output.collect_sorted::<Vec<_>>().await;
2409 assert_eq!(results, vec![(1, 5)]);
2410 });
2411
2412 assert!(count > 0);
2413 }
2414
2415 #[cfg(feature = "sim")]
2416 #[test]
2417 fn threshold_greater_or_equal_uniform_bounded() {
2418 let mut flow = FlowBuilder::new();
2419 let node = flow.process::<()>();
2420
2421 let values = node
2422 .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2423 .into_keyed()
2424 .first();
2425 let threshold = node.singleton(q!(5usize));
2426
2427 let output = values
2428 .threshold_greater_or_equal_uniform(threshold)
2429 .entries()
2430 .sim_output();
2431
2432 let count = flow.sim().exhaustive(async || {
2433 // key 1: 6 >= 5 ✓, key 2: 4 < 5 ✗
2434 let results = output.collect_sorted::<Vec<_>>().await;
2435 assert_eq!(results, vec![(1, 5)]);
2436 });
2437
2438 assert!(count > 0);
2439 }
2440}