Skip to content

StreamingDataFrame.concat(): concatenating multiple topics into a stream

Use StreamingDataFrame.concat() to combine two or more topics into a new stream containing all the elements from all the topics.

Use it when you need:

  • To process multiple topics as a single stream.
  • To combine the branches of the same StreamingDataFrame back together.

Examples

Example 1: Aggregate e-commerce orders from different locations into one stream and calculate the average order size in 1h windows.

from datetime import timedelta

from quixstreams import Application
from quixstreams.dataframe.windows import Mean

app = Application(...)

# Define the orders topics 
topic_uk = app.topic("orders-uk")
topic_de = app.topic("orders-de")

# Create StreamingDataFrames for each location
orders_uk = app.dataframe(topic_uk)
orders_de = app.dataframe(topic_de)

# Simulate the currency conversion step for each topic before concatenating them.
orders_uk["amount_usd"] = orders_uk["amount"].apply(convert_currency("GBP", "USD"))
orders_de["amount_usd"] = orders_de["amount"].apply(convert_currency("EUR", "USD"))

# Concatenate the orders from different locations into a new StreamingDataFrame.
# The new dataframe will have all records from both topics.
orders_combined = orders_uk.concat(orders_de)

# Calculate the average order size in USD within 1h tumbling window. 
orders_combined.tumbling_window(timedelta(hours=1)).agg(avg_amount_usd=Mean("amount_usd"))


if __name__ == '__main__':
    app.run()

Example 2: Combine branches of the same StreamingDataFrame back together.
See the Branching StreamingDataFrames page for more details about branching.

from quixstreams import Application
app = Application(...)

input_topic = app.topic("orders")
output_topic = app.topic("output")

# Create a dataframe with all orders
all_orders = app.dataframe(input_topic)

# Create a branches with DE and UK orders:
orders_de = all_orders[all_orders["country"] == "DE"]
orders_uk = all_orders[all_orders["country"] == "UK"]

# Do some conditional processing for DE and UK orders here
# ...

# Combine the branches back with .concat()
all_orders = orders_de.concat(orders_uk)

# Send data to the output topic
all_orders.to_topic(output_topic)


if __name__ == '__main__':
    app.run()

Message ordering between partitions

When using StreamingDataFrame.concat() to combine different topics, the application's internal consumer goes into a special "buffered" mode.

In this mode, it buffers messages per partition in order to process them in the timestamp order between different topics.
Timestamp alignment is effective only for the partitions with the same numbers: partition zero is aligned with other zero partitions, but not with partition one. This constraint exists because each consumer processes only its assigned partitions — a consumer holding partition 0 of both topics can align those two streams, but has no visibility into partition 1.

Why is this needed?
Consider two topics A and B with the following timestamps:

  • Topic A (partition 0): 11, 15
  • Topic B (partition 0): 12, 17

By default, Kafka does not guarantee the processing order to be 11, 12, 15, 17 because the order is guaranteed only within a single partition.

With timestamp alignment, the order is achievable given that the messages are already present in the topic partitions. If one topic's producer is slow and has not yet written messages, the consumer may wait or process the available topic's messages out of order.

Stateful operations on concatenated dataframes

To perform stateful operations like windowed aggregations on the concatenated StreamingDataFrame, the underlying topics must have the same number of partitions.
The application will raise the error when this condition is not met.

In addition, the message keys must be distributed using the same partitioning algorithm.
Otherwise, same keys may access different state stores leading to incorrect results.