Merge
Services in fluvio can be defined to have multiple sinks and sources. In this example, we will implement a service that takes in multiple sources via a merge. The example will simulate buying and selling stocks. There is a topic buy
and another topic sell
that aggreate to create a log of buy and sell orders. The visual shows the dataflow we will implement.
data:image/s3,"s3://crabby-images/73b76/73b76c972f4887d25cfa149fbdcd4bf62d5ce617" alt="Visual of defined dataflow"
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Transformation
We can add a transform operator to our source list.
source:
- type: topic
id: (...)
transforms:
- operator: map
run: |
(... filter function ...)
(... more topics ...)
In our case, we have two topics buy
and sell
that read json objects that include a name, amount, and price. We will map the json object into a string that gets sent into the topic message
which will log the orders.
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<String> {
Ok(format!("+ Buy Order for {}x{} at {}",order.name,order.amount,order.price))
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<String> {
Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
}
Running the Example
Copy and paste following config and save it as dataflow.yaml
.
# dataflow.yaml
apiVersion: 0.5.0
meta:
name: merge-example
version: 0.1.0
namespace: examples
config:
converter: json
types:
order:
type: object
properties:
name:
type: string
amount:
type: u32
price:
type: f32
topics:
buy:
schema:
value:
type: order
sell:
schema:
value:
type: order
message:
schema:
value:
type: string
services:
mergeservice:
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<String> {
Ok(format!("+ Buy Order for {}x{} at {}",order.name,order.amount,order.price))
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<String> {
Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
}
sinks:
- type: topic
id: message
To run example:
$ sdf run
Produce json objects to each of the topics:
$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell
Consume topic message
to retrieve the result:
$ fluvio consume message -Bd
"+ Buy Order for AMZNx20 at 173.33"
"- Sell Order for TSLAx20 at 219.41"
Both the buy order and sell order has been mapped into a string to be logged.
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
In this example, we covered how to use merge to allow services to consume multiple sources.