Merging streams

Estimated time: 25–35 minutes

In this tutorial, you will use Subscriber blocks to connect to the signals emitted by the Publisher blocks you configured in a different service in the previous tutorial. This pattern of communication between services is called pub/sub. You will subscribe to the signals in two separate streams from two separate topics. You will combine these streams into one and then analyze the data to identify trends.


Create service

This service will subscribe to signals that contain the data returned from Twitter along with a cumulative_count attribute, compare the number of tweets mentioning each company, and publish the name of the company with the most tweets.

  1. Click create new service.
  2. In the service name box, enter tweets-count-winner.

[info] Why a new service?

It is standard practice to create a new service when you are ready to complete a new task. When the signal in a service gets to a point where you might want to reuse that data in multiple places, that service is complete.


Add and Configure Subscriber blocks

Two Subscriber blocks are going to subscribe to the topics published by the Publisher blocks in the previous tweets-count service.

  1. Drag a Subscriber block onto your canvas.
  2. Configure the block as follows:
    1. Name: Subscribe Tesla Tweets
    2. Topic: workshop.tweets-count.tesla
  3. Click accept.
  4. Drag another Subscriber block onto your canvas.
  5. Configure the block as follows:
    1. Name: Subscribe Netflix Tweets
    2. Topic: workshop.tweets-count.netflix
  6. Click accept.

As soon as you configure the topic in a Subscriber block to match a topic in a Publisher block, a blue dot will appear on the top of the Subscriber block.

  1. Click the filled-in blue dot on the Subscribe Tesla Tweets block.
  2. Click the tweets-count service in the pop-up window.
  3. Click split in page.
    • Adjust your zoom settings by clicking the - or + buttons at the bottom of your canvas so you can see all of the blocks in each service.
  4. Click tweets-count-winner service in the list on the left to return to the single service view.

The split view shows you how data is being shared between two services.

In previous tutorials you used simulator blocks and generator blocks to drive your services, now you are using Subscriber blocks to stream signals into the service. Signals published from another service are flowing through this service via the Subscriber blocks.

[info] Pub/sub in the nio System Designer

Notice that the blue dots on the top edge of the Subscriber blocks are filled in with an arrow in the center. If you do not see input terminals on your Subscriber blocks, check that the subscriber topic exactly matches the topic in the corresponding Publisher block. If the topics don't match, there won't be incoming signals and the input terminal with the arrow will not appear.

When you click on one of those filled-in dots, you are shown a list of services publishing to that topic. If you click on the tweets-count service name, two options are available: open service or split in page.

If you choose open service, you will jump to that service. If you choose split in page, your screen will split, the top half will show you the service you are currently on and the bottom half will show the service that is publishing to your subscriber.


Label separate cumulative tweets count

Both of the Subscriber blocks in this service will have attributes with the same names in their output signals. For example, you will not be able to differentiate between the Tesla cumulative_count attribute and the Netflix cumulative_count attribute. Since a signal cannot have two attributes with the same name, when the two streams of signals are joined together by the MergeStreams block, the attributes from the signal streaming into INPUT_1 will be overwritten by the attributes streaming into INPUT_2. In this step you will reformat the streams of signals so the MergeStreams block can differentiate the attributes we want to compare. One way to do this is to use the Modifier block to give the attributes in these two signal streams different labels.

  1. Drag a Modifier block onto your canvas.
  2. Configure the block as follows:
    1. Name: Label Tesla Data
    2. Exclude Existing Fields?: True (select the radio button)
    3. Click + Fields and enter the following values:
      • Attribute Name: tesla
      • Attribute Value: {{ $.to_dict() }}
  3. Click accept.
  4. Drag a second Modifier block onto your canvas.
  5. Configure the block as follows:
    1. Name: Label Netflix Data
    2. Exclude Existing Fields?: True (select the radio button)
    3. Click + Fields and enter the following values:
      • Attribute Name: netflix
      • Attribute Value: {{ $.to_dict() }}
  6. Click accept.

[info] The $.to_dict()

Setting the Attribute Value to $.to_dict() nests the incoming signal in a tesla attribute (on the new signal, "tesla" is the key and the incoming signal, $, becomes the value). The signal needs to be converted to a dictionary data structure ($.to_dict()) in order for future blocks to be able to access the embedded values in later steps.


Merge company count information

The Netflix and Tesla tweets are streaming in from two different Subscriber blocks. Now you can combine them into one stream in order to compare the cumulative_count attribute on each to figure out which company is being mentioned more frequently on Twitter.

  1. Drag a MergeStreams block onto your canvas.
  2. Configure the block as follows:
    1. Name: Merge Tweets Count
    2. Load from Persistence: False (deselect the radio button)
    3. Notify Once: False (deselect the radio button)
  3. Click accept.

[info] Notify Once?

When Notify Once? is True (selected), the signal emitted from the block is never emitted again. When the value is False (deselected), the signal will be cached in the block until a new signal comes through the input to replace the existing one.

Now each signal contains a tesla object and a netflix object and inside each object is a cumulative_count attribute whose value corresponds to the number of tweets that mention each company.


Compare tweet counts

You need to compare the value of tesla['cumulative_count'] to netflix['cumulative_count'] to determine which company is currently trending on Twitter. The Filter block will emit signals from the TRUE output if the condition entered in the block evaluates to True on the incoming signal. Otherwise, if the condition evaluated on the incoming signal returns False, the signal will be emitted from the FALSE terminal. The Filter block's outgoing signals are exactly the same as the incoming signals, only the terminals are different.

  1. Drag a Filter block onto your canvas.
  2. Configure the block as follows:
    1. Name: Is Tesla Trending
    2. Click + Filter Conditions
    3. Condition: {{ $tesla['cumulative_count'] >= $netflix['cumulative_count'] }}
  3. Click accept.

[info] Nested values

The value 'cumulative_count' nested in both $tesla and $netflix can be accessed with bracket notation: netflix['cumulative_count'].

Signals where the Tesla count is greater than or equal to the Netflix count will now flow out of the TRUE terminal.


It is good practice to format and clearly label the final signal that is emitted from this service. In this case, you want to know the winner, the text of the latest tweet, when the tweet was sent, and how many tweets the company has received so far.

  1. Drag a Modifier block onto your canvas.
  2. Configure the block as follows:
    1. Name: Label Tesla Winning
    2. Exclude Existing Fields?: True (select the radio button)
    3. Click + Fields four times.
    4. Create a winner attribute with the value of Tesla:
      • Attribute Name box 1: winner
      • Attribute Value box 1: Tesla
    5. Extract the text attribute containing the text of the tweet from within the tesla label:
      • Attribute Name box 2: text
      • Attribute Value box 2: {{ $tesla['text'] }}
    6. Extract the created_at attribute containing the timestamp of when the tweet was posted from within the tesla label:
      • Attribute Name box 3: created_at
      • Attribute Value box 3: {{ $tesla['created_at'] }}
    7. Extract the cumulative_count attribute and rename it to count:
      • Attribute Name box 4: count
      • Attribute Value box 4: {{ $tesla['cumulative_count'] }}
  3. Click accept.
  4. Drag a second Modifier block onto your canvas.
  5. Configure the block as follows:
    1. Name: Label Netflix Winning
    2. Exclude Existing Fields?: True (select the radio button)
    3. Click + Fields four times.
    4. Create a winner attribute with the value of Netflix:
      • Attribute Name box 1: winner
      • Attribute Value box 1: Netflix
    5. Extract the text attribute containing the text of the tweet from within the netflix label:
      • Attribute Name box 2: text
      • Attribute Value box 2: {{ $netflix['text'] }}
    6. Extract the created_at attribute containing the timestamp of when the tweet was posted from within the netflix label:
      • Attribute Name box 3: created_at
      • Attribute Value box 3: {{ $netflix['created_at'] }}
    7. Extract the cumulative_count attribute and rename it to count:
      • Attribute Name box 4: count
      • Attribute Value box 4: {{ $netflix['cumulative_count'] }}
  6. Click accept.

Each service should end with some type of output or notification being sent somewhere. This service will end by publishing the data to a Topic. Other services in the system can subscribe to this topic.

  1. Drag a Publisher block onto your canvas.
  2. Configure the block as follows:
    1. Name: Publish Tweets Winner
    2. Topic: workshop.tweets-count.winner
  3. Click accept.

Connect blocks and start service

The Twitter data will stream into this service through the two subscriber blocks. The Merge Tweets Count block will merge the two streams of Twitter data into one. The Is Tesla Trending block will determine the trending company. The outgoing signal will then be formatted and published.

  1. Connect the blocks as configured in the diagram.
  2. Click save.
  3. Click start.
  4. Open the logger panel to see tweets from the trending company.

 

 

[info] Multiple connections

In the above configuration both Modifier blocks are streaming into a Publisher block and a Logger block. Multiple connections are not a problem.

 

[warning] Nothing is logging

If you are not seeing any logs in the logger panel, be sure to check your Twitter App credentials in the instance configuration. For example, the block will not receive tweets if there is any whitespace on either end of the configuration fields.

If you have checked your instance environment variables for correct configuration and are still not seeing logs in the logger panel, visit troubleshooting for other possible solutions.

This service follows a very common pattern: it subscribes to data, manipulates that data by adding attributes and merging streams, uses conditional statements to route the data into separate flows, and then sends the relevant results of that logic in a signal to a Publisher block.

You have learned how to drive a service by subscribing to data generated by a different service, how to perform logic with blocks, and how to publish the results of the service's logic to your system.


Extra credit

Here are some ideas to improve or expand the services you just created.

  • How could you reduce the number of winner signals sent in the service? Hint: you need to modify the flow of data, look into the Debounce block.
  • How could you improve the service further to only send a signal when the winner changes? Hint: the StateChange block may be helpful.
  • How would you post a Tweet to your own Twitter account making an announcement every time the winner changes? Hint: look into the TwitterPost block.

Getting help

We're always happy to help with any questions you might have about the nio Platform. View the troubleshooting guide, search the documentation, or post your questions in the forum. You can also contact live support by clicking the chat icon in the lower-right corner of the nio System Designer.

Troubleshooting tips related specifically to this tutorial are presented below.

Error Possible Cause Solution
Error 420, Enhance Your Calm You have reached your Twitter API limit. Stop the tweets-count service for a while and let your Rate Limit reset.
Not getting any messages in Logger panel and no error messages. Services are not started. Make sure the tweets-count service is running.
Input to the Subscriber block is not filled in with an arrow. Topic name does not match any published topics. Make sure the topic in the subscriber block exactly matches a topic in a Publisher block.

 

proceed to tutorial 4: data analysis »
 

results matching ""

    No results matching ""