Check out the highlights from Developer Week
Tutorial

Using Macrometa Stream Workers In A Real-Time Log Analytics Dashboard

Post Image

BAM! Simplifying Complex Event Processing

In our previous blog in this series, we took a look at the high-level architecture of our log analytics dashboard example application that we developed with our partner Fastly. In this article, we look at the Stream Workers that are used for Complex Event Processing of data in motion.

Macrometa provides developers with a simple way of performing Complex Event Processing using Stream Workers. Macrometa Stream Workers gives you “superpowers” that allow you to transform data from one data type to another, enrich data by combining it with other databases or data services, aggregate streams, clean and filter data, and more.

Stream Workers are written in Macrometa Stream QL. This domain-specific language combines the simple query language of SQL with declarative stream, source, and sink definitions. It also supports authoring arbitrary helper functions in Javascript.

Let’s walk through the basics of the Stream Workers code in the Fastly log analytics demo application. We go over code patterns used in these Stream Worker examples, and you can view our documentation for more source code details.

In our Fastly log processing application, we use three stream workers. One sends HTTP requests to the Fastly service every three seconds to generate logs to analyze and the other two are responsible for reading and aggregating those Fastly web logs. For more information about the architecture of this log processing application, see the previous blog.

Stream Worker Fastly-log-generator (code)

This Stream Worker uses trigger to make three HTTP requests to the Fastly service every three seconds.

Annotation to name the stream worker application:

@App:name("fastly-log-generator")
@App:description("fastly-log-generator")

Define a trigger that runs the stream worker every three seconds. The stream worker will make three HTTP requests to the Fastly service to populate the Macrometa

FastlyGetLogGenerator, FastlyPostLogGenerator, FastlyPutLogGenerator, and FastlyInitCollectionGenerator streams.

define trigger FastlyLogGeneratorTrigger at every 3 sec;

The following stanza creates the stream definition for FastlyGetLogGenerator. There is a sink for the stream, created using a sink annotation indicating that this is an HTTP sink along with a map annotation that defines the sink output format, in this case JSON. A sink, in the context of stream processing, is used to pass transformed data downstream for storage or additional processing.

Additional streams are defined for other HTTP methods for which we want to generate Fastly logs.

-- http sink used for making HTTP request with GET method
@sink(type='http', publisher.url='{{url}}', method='GET', @map(type='json'))
define stream FastlyGetLogGenerator (url string);

The Stream QL statement below retrieves a URL to contact from the FastLogGeneratorTrigger collection and inserts it into the FastlyGetLogGenerator stream (explained above). This will cause the FastlyGetLogGenerator stream to connect to the provided URL using the HTTP GET method.

select "https://FASTLY_COMPUTE_EDGE_SERVICE_URL/collections/key" as url
from FastlyLogGeneratorTrigger
insert into FastlyGetLogGenerator;

Stream Worker fastly-http-request-worker (code)

This Stream Worker reads fastly logs from `logs` collection and performs reordering for out-of-order logs. Also, based on log timestamp, it determines the 10 second window and adds that window timestamp as an additional field.

Annotation to name the stream worker application:

@App:name("fastly-http-request-worker")
@App:description("fastly-http-request-worker")
define function getTime[javascript] return string {
	const date = new Date(data[0]);
	const isRequestMinute = JSON.parse(data[1]);

	if (isRequestMinute) {
    	const logTimestampSecond = date.getSeconds();

    	if (logTimestampSecond >= 0 && logTimestampSecond <= 9) {
        	return date.setSeconds("00").toString();
    	}
    	if (logTimestampSecond >= 10 && logTimestampSecond <= 19) {
        	return date.setSeconds("10").toString();
    	}
    	if (logTimestampSecond >= 20 && logTimestampSecond <= 29) {
        	return date.setSeconds("20").toString();
    	}
    	if (logTimestampSecond >= 30 && logTimestampSecond <= 39) {
        	return date.setSeconds("30").toString();
    	}
    	if (logTimestampSecond >= 40 && logTimestampSecond <= 49) {
        	return date.setSeconds("40").toString();
    	}
    	if (logTimestampSecond >= 50 && logTimestampSecond <= 59) {
        	return date.setSeconds("50").toString();
    	}
	} else {
    	return date.getTime().toString();
	}
};

Define an input stream called LogRecords with a c8db (Macrometa database collection) source and specify the fields and datatypes.

@source(type='c8db', collection='fastly_logs', @map(type='json'))
define stream LogRecords(timestamp string, request_method string, 
response_status int, url string, response_body_size long, time_elapsed long, 
geo_country string, client_ip string);

Define an output stream called IntermediateStream with a c8streams sink.

@sink(type="c8streams", stream="fastly-intermediate-stream", replication.type="global", 
@map(type='json'))define stream IntermediateStream(request_10sec_timeframe long, 
request_timeStamp long, request_method string, response_status int,url string, 
response_body_size long, latency long, country string, client_ip string);

The following Stream QL queries the LogRecords collection stream and inserts a request_10sec_timeframe field along with some text processing of the url field.

select
	convert(getTime(timestamp, true), "long") as request_10sec_timeframe,
	convert(getTime(timestamp, false), "long") as request_timeStamp,
	request_method,
	response_status,
	str:split(url, "\?", 0) as url,
	response_body_size,
	time_elapsed as latency,
	geo_country as country,
	client_ip
from LogRecords
insert into LogRecordsReorderingStream;

Use the Reorder stream processing function to use the kslack algorithm to reorder out of order events and insert the results into IntermediateStream, defined above.

select *
from LogRecordsReorderingStream#reorder:kslack(request_timeStamp, 10000l)
insert into IntermediateStream;

Stream Worker fastly-http-request-stats-1m-worker (code)

This Stream Worker reads logs from `fastly-intermediate-stream` and performs aggregation within a 10-second window. It performs aggregation for response_status, url, response_body_size, latency.

Annotation to name the stream worker application:

@App:name("fastly-http-request-stats-1m-worker")
@App:description("fastly-http-request-stats-1m-worker")

The following lines set up the IntermediateStream source stream from which to read the bucketed and ordered log data.

@source(type="c8streams", stream.list="fastly-intermediate-stream", replication.type="global", 
@map(type='json'))define stream IntermediateStream(request_10sec_timeframe long, 
request_timeStamp long, request_method string, response_status int, url string, 
response_body_size long, latency long, country string, client_ip string);

Create a reference to the HttpResponseCodeStats1min c8db Macrometa database collection.

@store(type="c8db", collection="fastly_http_response_code_stats_1m", @map(type='json'))
define table HttpResponseCodeStats1min(request_10sec_timeframe long, response_status int, 
count long);

Create the HttpResponseCodeStats1minStream stream to store the aggregate status code counts.

@sink(type="c8streams", stream="fastly_http_response_code_stats_1m", replication.type="global", 
@map(type='json'))define stream HttpResponseCodeStats1minStream(request_10sec_timeframe long, 
response_status int, count long);

Stream QL that aggregates the response types by 10-second buckets using a window function. These aggregates are then written to the HttpResponseCodeStats1minStream.

-- Counts the total number of requests for a response_status for every 10 sec
SELECT
	request_10sec_timeframe,
	response_status,
	count() as count
FROM IntermediateStream#window.externalTimeBatch(request_timeStamp, 10 sec, 
request_10sec_timeframe)
GROUP BY response_status
INSERT INTO HttpResponseCodeStats1minStream;

The code writes additional aggregates and summary data to Macrometa collections. Having read this article, the rest of the code should be straightforward to understand.

Conclusion

The Fastly log analytics application only scratches the surface of what can be accomplished with Macrometa Stream Workers. A rich selection of functions are available in Stream QL to facilitate Complex Event Processing.

For more information, see the Macrometa Stream Workers documentation. If you are new to Macrometa and would like to build your own superhero adventure, sign up for a free developer account and see everything you can do in one platform.

In our next blog in the series, we will be covering the use of Macrometa Query Workers in the log analytics application.

Join our newsletter and get the latest posts to your inbox
Form loading...





Featured Posts

Related Posts

Recent Posts

Platform

Global Data Network
Join the Newsletter