B

BAM! Simplifying Complex Event Processing

In our previous blog in this series, we took a look at the high-level architecture of our log analytics dashboard example application that we developed with our partner Fastly. In this article, we look at the Stream Workers that are used for Complex Event Processing of data in motion.

Macrometa provides developers with a simple way of performing Complex Event Processing using Stream Workers. Macrometa Stream Workers gives you “superpowers” that allow you to transform data from one data type to another, enrich data by combining it with other databases or data services, aggregate streams, clean and filter data, and more.

Stream Workers are written in Macrometa Stream QL. This domain-specific language combines the simple query language of SQL with declarative stream, source, and sink definitions. It also supports authoring arbitrary helper functions in Javascript.

Let’s walk through the basics of the Stream Workers code in the Fastly log analytics demo application. We go over code patterns used in these Stream Worker examples, and you can view our documentation for more source code details.

In our Fastly log processing application, we use three stream workers. One sends HTTP requests to the Fastly service every three seconds to generate logs to analyze and the other two are responsible for reading and aggregating those Fastly web logs. For more information about the architecture of this log processing application, see the previous blog.

Stream Worker Fastly-log-generator (code)

This Stream Worker uses trigger to make three HTTP requests to the Fastly service every three seconds.

Annotation to name the stream worker application:

Define a trigger that runs the stream worker every three seconds. The stream worker will make three HTTP requests to the Fastly service to populate the Macrometa

FastlyGetLogGenerator, FastlyPostLogGenerator, FastlyPutLogGenerator, and FastlyInitCollectionGenerator streams.

The following stanza creates the stream definition for FastlyGetLogGenerator. There is a sink for the stream, created using a sink annotation indicating that this is an HTTP sink along with a map annotation that defines the sink output format, in this case JSON. A sink, in the context of stream processing, is used to pass transformed data downstream for storage or additional processing.

Additional streams are defined for other HTTP methods for which we want to generate Fastly logs.

The Stream QL statement below retrieves a URL to contact from the FastLogGeneratorTrigger collection and inserts it into the FastlyGetLogGenerator stream (explained above). This will cause the FastlyGetLogGenerator stream to connect to the provided URL using the HTTP GET method.

Stream Worker fastly-http-request-worker (code)

This Stream Worker reads fastly logs from `logs` collection and performs reordering for out-of-order logs. Also, based on log timestamp, it determines the 10 second window and adds that window timestamp as an additional field.

Annotation to name the stream worker application:

Stream Workers allow developers to create arbitrary functions written in Javascript. The getTime function buckets timestamps by 10-second intervals.

Define an input stream called LogRecords with a c8db (Macrometa database collection) source and specify the fields and datatypes.

Define an output stream called IntermediateStream with a c8streams sink.

The following Stream QL queries the LogRecords collection stream and inserts a request_10sec_timeframe field along with some text processing of the url field.

Use the Reorder stream processing function to use the kslack algorithm to reorder out of order events and insert the results into IntermediateStream, defined above.

Stream Worker fastly-http-request-stats-1m-worker (code)

This Stream Worker reads logs from `fastly-intermediate-stream` and performs aggregation within a 10-second window. It performs aggregation for response_status, url, response_body_size, latency.

Annotation to name the stream worker application:

The following lines set up the IntermediateStream source stream from which to read the bucketed and ordered log data.

Create a reference to the HttpResponseCodeStats1min c8db Macrometa database collection.

Create the HttpResponseCodeStats1minStream stream to store the aggregate status code counts.

Stream QL that aggregates the response types by 10-second buckets using a window function. These aggregates are then written to the HttpResponseCodeStats1minStream.

The code writes additional aggregates and summary data to Macrometa collections. Having read this article, the rest of the code should be straightforward to understand.

Conclusion

The Fastly log analytics application only scratches the surface of what can be accomplished with Macrometa Stream Workers. A rich selection of functions are available in Stream QL to facilitate Complex Event Processing.

For more information, see the Macrometa Stream Workers documentation. If you are new to Macrometa and would like to build your own superhero adventure, sign up for a forever free dev account and see everything you can do in one platform.

In our next blog in the series, we will be covering the use of Macrometa Query Workers in the log analytics application.

Posted 
Jun 7, 2022
 in 
Tutorial
 category

More from 

Tutorial

 category

View All

Join Our Newsletter and Get the Latest
Posts to Your Inbox