Walkthrough

The feed package provides useful tools when building small scale streaming operations. The primary reason for using this package is to help build the mechanisms that generate observations from an environment. Therefore, it is fitting that their primary location of use is in the Observer component. The Stream API provides the granularity needed to connect specific data sources to the Observer.

What is a Stream?

A Stream is the basic building block for the DataFeed, which is also a stream itself. Each stream has a name and a data type and they can be set after the stream is created. Streams can be created through the following mechanisms:

  • generators
  • iterables
  • sensors
  • direct implementation of Stream

For example, if you wanted to make a stream for a simple counter. We will make it such that it will start at 0 and increment by 1 each time it is called and on reset will set the count back to 0. The following code accomplishes this functionality through creating a generator function.

from feed import Stream

def counter():
    i = 0
    while True:
        yield i
        i += 1

s = Stream.source(counter)

In addition, you can also use the built-in count generator from the itertools package.

from itertools import count

s = Stream.source(count(start=0, step=1))

These will all create infinite streams that will keep incrementing by 1 to infinity. If you wanted to make something that counted until some finite number you can use the built in range function.

s = Stream.source(range(5))

This can also be done by giving in a list directly.

s = Stream.source([1, 2, 3, 4, 5])

The direct approach to stream creation is by sub-classing Stream and implementing the forward, has_next, and reset methods. If the stream does not hold stateful information, then reset is not required to be implemented and can be ignored.

class Counter(Stream):

    def __init__(self):
        super().__init__()
        self.count = None

    def forward(self):
        if self.count is None:
            self.count = 0
        else:
            self.count += 1
        return self.count

    def has_next(self):
        return True

    def reset(self):
        self.count = None

s = Counter()

Using Data Types

The purpose of the data type of a stream, dtype, is to add additional functionality and behavior to a stream such that it can be aggregated with other streams of the same type in an easy and intuitive way.

Since the most common data type is float in these tasks, the following is a list of supported special operations for it:

  • Let s, s1, s2 be streams.
  • Let c be a constant.
  • Let n be a number.
  • Unary:
    • -s, s.neg()
    • abs(s), s.abs()
    • s**2, pow(s, n)
  • Binary:
    • s1 + s2, s1.add(s2), s + c, c + s
    • s1 - s2, s1.sub(s2), s - c, c - s
    • s1 * s2, s1.mul(s2), s * c, c * s
    • s1 / s2, s1.div(s2), s / c, c / s

There are many more useful functions that can be utilized, too many to list in fact. You can find all of the. however, in the API reference section of the documentation.