Provide a detailed summary of the following web content, including what type of content it is (e.g. news article, essay, technical report, blog post, product documentation, content marketing, etc). If the content looks like an error message, respond 'content unavailable'. If there is anything controversial please highlight the controversy. If there is something surprising, unique, or clever, please highlight that as well: Title: Parallel streaming in Haskell: Part 4 – Conditionals and non-blocking evaluation Site: www.channable.com Here is the final blog post about the parallel streaming evaluator we use at Channable, where we use Haskell with the Conduit library to produce both result values and parallel work units in a single stream. We will assume that you've read the previous three parts, so go ahead and do that now! In this blog post we will explain how we implemented conditionals. More precisely, we show how we can efficiently send only values that match a condition through an aggregation. As a preliminary to conditionals we need to ensure that evaluation is done in a non-blocking fashion. Non-blocking evaluation Sometimes during evaluation we have to wait on parallel work that is currently being evaluated. Examples where this might occur are: during the joining phase of aggregations we know which input blocks we want to join together but those input blocks may not have been produced yet if a sinkItems has produced work to consume every incoming stream, it can't do anything until all results are available in the sequentialize variation we can have similar situations, where it can't yield any items until an incoming stream has completed The most obvious place to do the waiting is within the top-level conduit. That's precisely what we did with the takeMVar s in the sinkItems implementation of the first blog post , here's the code snippet: Nothing -> fmap concat $ traverse (liftIO . takeMVar) chunkVars A real problem with this is that the consumer of this conduit can't detect when such a blocking wait occurs. A function like our runConduitWithWork might observe that it takes a long time to take a step in the conduit, but it's hard to see if it's doing useful work or if it's just blocking. The ability to see that a work producer can't do anything at the moment is crucial in our implementation of conditionals, where we'll have two branches and if one blocks we may want to proceed working on the other branch. This will be further explained in the next chapter. Our solution is to have a very strict non-blocking policy, and instead allow our conduits to yield a special signal that tells the consumer that it can't make progress. For this purpose we add a new constructor to our WorkOr type: data WorkOr a = WOWork ( IO ()) | WOValue a | WONothingYet The takeMVar call is now forbidden with our non-blocking policy, but we can write a drop-in replacement for the liftIO . takeMVar combination that does what we need: nonBlockingTakeMVar :: MVar a -> ConduitT i ( WorkOr o) IO a nonBlockingTakeMVar var = tryTakeMVar var >>= \ case Nothing -> do Conduit .yield WONothingYet nonBlockingTakeMVar var Just x -> pure x This looks a bit like a busy wait with repeated tryTakeMVar calls, but the yield in between makes it slightly different. A carefully written consumer can observe when the WONothingYet is being yielded and gets to decide how to continue. In our runConduitWithWork implementation we just use a Control.Concurrent.yield (unrelated to Conduit.yield ) to actively allow any other waiting threads to run before we continue our loop. We used to have a small threadDelay of 100 microseconds here, but that was inefficient. It cost more CPU time and the job ran slower. HaveOutput pipe WONothingYet -> do Control . Concurrent .yield withPipe pipe Another aspect of the non-blocking policy is that we never block within a parallel work unit or within a parallel stream. In other words, we only yield a WOWork parallel work unit or a ParallelStream when they can be evaluated immediately, in full, without blocking. So for example when we're in the joining phase of an aggregation, we only yield a work unit to join two blocks once the required blocks are available (as opposed to just yielding the work and then blocking during evaluation of that work). This gives us two important properties: We always make progress, even when the number of WOWork parallel work units that we run at the same time is limited. You don't want all your threads to be blocked on results from a work unit that will never run because all threads are taken. The evaluator can accurately measure how much work actually happens and how long we're waiting. This is necessary to estimate the optimal number of threads, as discussed in the Parallel streaming in Haskell: Part 3 - A parallel work consumer . Conditionals In our tool, users can specify conditionals. Let's say we have a conditional like this: IF length (.description) > 10 THEN action1 ELSE action2 The intent is quite clear: When the length of the description field is more than 10 we should apply action1 and otherwise we should apply action2 . This works intuitively for actions like MAP and FILTER where we can look at every individual item, check the conditional, apply the corresponding action and then yield the modified item if it was not filtered out. Other actions, for instance DEDUPLICATION or SORT , can't be applied on an individual item but we do allow them in conditionals. The exact interpretation is therefore a bit counter-intuitive to what programmers might expect, because we actually: Process all items to see if they match the condition All items that match the condition will be sent to action1 , which may or may not immediately yield output items All items that don't match the condition will be sent to action2 , which may or may not immediately yield output items This means that we need to partition the items based on the condition, evaluate the 2 branches and recombine the results. All of this needs to have a deterministic order and needs to be done as fast as possible. One can think of the behavior as the following code: conditional condition thenActions elseActions = \items -> do let (trueItems, falseItems) = partition condition items items1 <- thenActions trueItems items2 <- elseActions falseItems pure (items1 <> items2) To precisely explain how conditionals are implemented, we'll consider 2 categories of actions that are allowed in conditionals: Streaming actions like MAP and FILTER process one item at a time. Any input that they consume is directly translated into the corresponding output (or a lack of an output for a filter), so they're not allowed to remember some items and yield them later. If we pull on the output of the action we expect it to pull on its input, without any buffering in between. Aggregating actions consume all the input items before they start yielding any output. We've discussed aggregations in the previous blog post, and it includes things like deduplication, sorting and grouping. The items are buffered in some data structure. Based on the actions contained in a conditional we determine if this is an aggregating conditional or a streaming conditional , and we pick an evaluation strategy accordingly. Aggregating conditionals We will consider this regime if one of the branches contains at least one aggregating action. The main property of an aggregating action is the fact that it already has an implicit buffer. We'll use this "free" buffer (it doesn't cost us any extra) to make sure the results are deterministic. The assumption we are making is that there should always be at least one aggregating action in the else -branch. If there is only an aggregating action in the then branch, we modify the conditional in order to get the aggregation action in the else -branch. IF not condition THEN else - branch ELSE then - branch We now have a conditional where the else -branch contains at least one aggregation action. Both branches can additionally contain more streaming, aggregating, or any other actions. Our goal now is to write a function like so: aggregatingConditional :: ( Item -> IO Bool ) -> ConduitT ( WorkOr ParallelStream ) ( WorkOr ParallelStream ) IO () -> ConduitT ( WorkOr ParallelStream ) ( WorkOr ParallelStream ) IO () -> ConduitT ( WorkOr ParallelStream ) ( WorkOr ParallelStream ) IO () The conditional takes WorkOr ParallelStream s as the input, so there are a few cases that we need to deal with. The simplest case is when we get a WOWork parallel work unit, we can simply pass it along to the output without having to go through the conditional branches. This is slightly more efficient, but more importantly it prevents duplication of work. When we get a ParallelStream instead, we want to construct two ParallelStream s. By applying the condition to all the items we build one ParallelStream with just the values for the then-branch and another with just the values for the else-branch. We effectively have three branches: The pass-around branch for WOWork units from the input The then-branch that produces WorkOr ParallelStream outputs The else-branch also produces WorkOr ParallelStream outputs The outputs of all of these branches is combined by simply interleaving them in whatever order the outputs come. For parallel work units this is certainly valid, because they can be executed in any order. The order in which we evaluate work doesn't influence the results. For the ParallelStream s we have to be very careful though, because we want a deterministic ordering that is consistent between runs and that doesn't rely on specifics like chunk sizes. That's why we require the else branch to have at least one aggregation, because then we always get all the streams from the then-branch before we get the first stream from the else-branch. The deterministic order is then that all values where the condition applies come before the values where the condition doesn't apply (and for the rest items are ordered the same as in the input). Zipping and WONothingYet Combining the three streams is easier said than done. The logical route would be to use a ZipConduit for this. A ZipConduit duplicates each input to multiple conduits and then combines the outputs. They interleave the outputs in a left-biased manner, meaning that if multiple conduits have an output available (they're in the HaveOutput state), the output from the