Skip to main content

Sentiment Analysis with Macrometa Stream Workers

The sentiment function is an extension in Macrometa's Stream Workers that allows you to calculate the sentiment score of a given text. It uses the AFINN word list, a collection of words with assigned sentiment scores ranging from -5 (very negative) to 5 (very positive). By using this function, you can analyze the sentiment of a piece of text, such as a tweet or a news headline, in real-time.

For more technical information, refer to the sentiment:getRate documentation.

Why Use Sentiment Analysis?

Sentiment analysis can provide valuable insights into how people feel about a particular topic, product, or event. By analyzing the sentiment of text data, you can identify patterns, trends, and potential issues that might require further attention. Sentiment analysis is useful in the following situations:

  • Monitor customer feedback and reviews
  • Track public opinion on social media
  • Analyze market trends and news headlines
  • Evaluate customer support interactions
  • Identify potential crisis situations

Stream Worker Examples

These examples showcase how the sentiment function can be applied to various industries to analyze and process data in real-time. By understanding and implementing sentiment analysis in your stream workers, you can gain valuable insights and make better data-driven decisions.

Example 1: News Headlines Sentiment Analysis

CREATE STREAM NewsHeadlinesStream (headline string);
CREATE SINK STREAM SentimentAnalysisStream (headline string, sentimentScore int);

@info(name = 'simpleSentimentAnalysis')
INSERT INTO SentimentAnalysisStream
SELECT headline, sentiment:getRate(headline) AS sentimentScore
FROM NewsHeadlinesStream;

In this example, a stream worker processes news headlines from the NewsHeadlinesStream source, computes their sentiment scores, and outputs the results to the SentimentAnalysisStream sink.

The simpleSentimentAnalysis stream worker query processes incoming events from the NewsHeadlinesStream. It calculates the sentiment score of each headline using the sentiment:getRate(headline) function and selects both the headline and the computed sentimentScore for output. The results are then inserted into the SentimentAnalysisStream using the INSERT INTO action.

Example 2: Social Media Posts Sentiment Analysis

CREATE STREAM SocialMediaPostsStream (timestamp long, username string, post string);

CREATE SINK STREAM SentimentAnalysisStream (timestamp long, username string, post string, sentimentScore int);

@info(name = 'socialMediaSentimentAnalysis')
INSERT INTO SentimentAnalysisStream
SELECT timestamp, username, post, sentiment:getRate(post) AS sentimentScore
FROM SocialMediaPostsStream;

In this example, a stream worker processes social media posts from the SocialMediaPostsStream source, computes their sentiment scores, and outputs the results to the SentimentAnalysisStream sink.

The socialMediaSentimentAnalysis stream worker query processes incoming events from the SocialMediaPostsStream. It calculates the sentiment score of each post using the sentiment:getRate(post) function and selects the timestamp, username, post, and the computed sentimentScore for output. The results are then inserted into the SentimentAnalysisStream using the INSERT INTO action.

Example 3: Customer Reviews Sentiment Analysis and Aggregation

CREATE STREAM CustomerReviewsStream (productId string, timestamp long, username string, review string);

CREATE TABLE AverageSentimentTable (_key string , avgSentiment double);

CREATE SINK STREAM SentimentAnalysisStream (productId string, timestamp long, username string, review string, sentimentScore int);

@info(name = 'customerReviewsSentimentAnalysis')
INSERT INTO SentimentAnalysisStream
SELECT productId, timestamp, username, review, sentiment:getRate(review) AS sentimentScore
FROM CustomerReviewsStream;

@info(name = 'averageSentimentPerProduct')
INSERT INTO AverageSentimentTable
SELECT productId as _key, avg(sentimentScore) AS avgSentiment
FROM SentimentAnalysisStream WINDOW TUMBLING_LENGTH(5)
GROUP BY productId;

@info(name = 'averageSentimentPerProductUpdate')
UPDATE AverageSentimentTable
SET AverageSentimentTable._key = _key, AverageSentimentTable.avgSentiment = avgSentiment
ON AverageSentimentTable._key == _key
SELECT productId as _key, avg(sentimentScore) AS avgSentiment
FROM SentimentAnalysisStream WINDOW TUMBLING_LENGTH(5)
GROUP BY productId;

In this example, a stream worker processes customer reviews from the CustomerReviewsStream source, computes their sentiment scores, and outputs the results to the SentimentAnalysisStream sink. Additionally, it calculates the average sentiment per product and updates the AverageSentimentTable.

The customerReviewsSentimentAnalysis stream worker query processes incoming events from the CustomerReviewsStream. It calculates the sentiment score of each review using the sentiment:getRate(review) function and selects the productId, timestamp, username, review, and the computed sentimentScore for output. The results are then inserted into the SentimentAnalysisStream using the INSERT INTO action.

The averageSentimentPerProduct stream worker query calculates the average sentiment score per product from the SentimentAnalysisStream within a tumbling window of length 5. It groups the results by productId and inserts the productId as _key and the avgSentiment into the AverageSentimentTable using the INSERT INTO action.

The averageSentimentPerProductUpdate stream worker query calculates the average sentiment score per product from the SentimentAnalysisStream within a tumbling window of length 5, similar to the previous query. However, instead of inserting new records, it updates the existing records in the AverageSentimentTable using the UPDATE action, based on the matching _key value (the productId).