feed.core package

Submodules

feed.core.accessors module

class feed.core.accessors.CachedAccessor(name: str, accessor)[source]

Bases: object

Custom property-like object.

A descriptor for caching accessors.

Parameters:
  • name (str) – Namespace that will be accessed under, e.g. df.foo.
  • accessor (cls) – Class with the extension methods.

References

[1]https://github.com/pandas-dev/pandas/blob/v1.1.0/pandas/core/accessor.py#L285-L289

feed.core.base module

class feed.core.base.Constant(value, dtype: str = None)[source]

Bases: feed.core.base.Stream

A stream that generates a constant value.

forward()[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'constant'
has_next()[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
class feed.core.base.Group[source]

Bases: feed.core.base.Stream

A stream that groups together other streams into a dictionary.

forward() → Dict[T][source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
class feed.core.base.IterableStream(source: Iterable[T], dtype: str = None)[source]

Bases: feed.core.base.Stream

A private class used the Stream class for creating data sources.

Parameters:
  • source (Iterable[T]) – The iterable to be used for providing the data.
  • dtype (str, optional) – The data type of the source.
forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'stream'
has_next()[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
reset()[source]

Resets all the listeners of the stream.

class feed.core.base.NameSpace(name: str)[source]

Bases: feed.core.base.Named

A class providing a context in which to create names.

This becomes useful in cases where Named object would like to use the same name in a different context. In order to resolve naming conflicts in a DataFeed, this class provides a way to solve it.

Parameters:name (str) – The name for the NameSpace.
class feed.core.base.Named(name: str = None)[source]

Bases: object

A class for controlling the naming of objects.

The purpose of this class is to control the naming of objects with respect to the NameSpace to which they belong to. This prevents conflicts that arise in the naming of similar objects under different contexts.

Parameters:name (str, optional) – The name of the object.
name

The name of the object.

Type:str, optional
generic_name = 'generic'
names = {}
namespaces = []
rename(name: str, sep: str = ':/') → feed.core.base.Named[source]

Renames the instance with respect to the current NameSpace.

Parameters:
  • name (str) – The new name to give to the instance.
  • sep (str) – The separator to put between the name of the NameSpace and the new name of the instance (e.g. ns:/example).
Returns:

The instance that was renamed.

Return type:

Named

class feed.core.base.Observable[source]

Bases: object

An object with some value that can be observed.

An object to which a listener can be attached to and be alerted about on an event happening.

listeners

A list of listeners that the object will alert on events occurring.

Type:list of listeners
attach(listener)[source]

Adds a listener to receive alerts.

detach(listener)[source]

Removes a listener from receiving alerts.

attach(listener) → feed.core.base.Observable[source]

Adds a listener to receive alerts.

Parameters:listener (a listener object) –
Returns:The observable being called.
Return type:Observable
detach(listener) → feed.core.base.Observable[source]

Removes a listener from receiving alerts.

Parameters:listener (a listener object) –
Returns:The observable being called.
Return type:Observable
class feed.core.base.Placeholder(dtype: str = None)[source]

Bases: feed.core.base.Stream

A stream that acts as a placeholder for data to be provided at later date.

forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'placeholder'
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
push(value: T) → None[source]
reset() → None[source]

Resets all the listeners of the stream.

class feed.core.base.Sensor(obj, func, dtype=None)[source]

Bases: feed.core.base.Stream

A stream that watches and generates from a particular object.

forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'sensor'
has_next()[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
class feed.core.base.Stream(name: str = None, dtype: str = None)[source]

Bases: typing.Generic, feed.core.base.Named, feed.core.base.Observable

A class responsible for creating the inputs necessary to work in a DataFeed.

Parameters:
  • name (str, optional) – The name fo the stream.
  • dtype (str, optional) – The data type of the stream.
source(iterable, dtype=None)[source]

Creates a stream from an iterable.

group(streams)[source]

Creates a group of streams.

sensor(obj, func, dtype=None)[source]

Creates a stream from observing a value from an object.

select(streams, func)[source]

Selects a stream satisfying particular criteria from a list of streams.

constant(value, dtype)[source]

Creates a stream to generate a constant value.

asdtype(dtype)

Converts the data type to dtype.

accumulate(*args, **kwargs)
apply(*args, **kwargs)
astype(dtype: str) → feed.core.base.Stream[~T][T][source]

Converts the data type to dtype.

Parameters:dtype (str) – The data type to be converted to.
Returns:The same stream with the new underlying data type dtype.
Return type:Stream[T]
bool

alias of feed.api.boolean.BooleanMethods

static constant(value: T, dtype: str = None) → feed.core.base.Stream[~T][T][source]

Creates a stream to generate a constant value.

Parameters:
  • value (T) – The constant value to be streamed.
  • dtype (str, optional) – The data type of the value.
Returns:

A stream of the constant value.

Return type:

Stream[T]

copy(*args, **kwargs)
static extend_instance(instance: feed.core.base.Stream[~T][T], mixin: feed.core.mixins.DataTypeMixin) → feed.core.base.Stream[~T][T][source]

Apply mix-ins to a class instance after creation.

Parameters:
  • instance (Stream[T]) – An instantiation of Stream to be injected with mixin methods.
  • mixin (DataTypeMixin) – The mixin holding the methods to be injected into the instance.
Returns:

The instance with the injected methods provided by the mixin.

Return type:

Stream[T]

float

alias of feed.api.float.FloatMethods

forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
freeze(*args, **kwargs)
gather() → List[Tuple[feed.core.base.Stream, feed.core.base.Stream]][source]

Gathers all the edges of the DAG connected in ancestry with this stream.

Returns:The list of edges connected through ancestry to this stream.
Return type:List[Tuple[Stream, Stream]]
generic_name = 'stream'
static group(streams: List[feed.core.base.Stream[~T][T]]) → feed.core.base.Stream[dict][dict][source]

Creates a group of streams.

Parameters:streams (List[Stream[T]]) – Streams to be grouped together.
Returns:A stream of dictionaries with each stream as a key/value in the dictionary being generated.
Return type:Stream[dict]
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
lag(*args, **kwargs)
placeholder() → feed.core.base.Stream[~T][T][source]

Creates a placholder stream for data to provided to at a later date.

Parameters:dtype (str) – The data type that will be provided.
Returns:A stream representing a placeholder.
Return type:Stream[T]
reduce(*args, **kwargs)
classmethod register_accessor(name: str)[source]

A class decorator that registers an accessor providing useful methods for a particular data type..

Sets the data type accessor to be an attribute of this class.

Parameters:name (str) – The name of the data type.
classmethod register_generic_method(names: List[str])[source]

A function decorator that registers the decorated function with the names provided as a method to the Stream class.

These methods can be used for any instance of Stream.

Parameters:names (List[str]) – The list of names to be used as aliases for the same method.
classmethod register_mixin(dtype: str)[source]

A class decorator the registers a data type mixin providing useful methods directly to the instance of the class.

Parameters:dtype (str) – The name of the data type the mixin is being registered for.
reset() → None[source]

Resets all the listeners of the stream.

run() → None[source]

Runs the underlying streams once and iterates forward.

static select(streams: List[feed.core.base.Stream[~T][T]], func: Callable[[feed.core.base.Stream[~T][T]], bool]) → feed.core.base.Stream[~T][T][source]

Selects a stream satisfying particular criteria from a list of streams.

Parameters:
  • streams (List[Stream[T]]) – A list of streams to select from.
  • func (Callable[[Stream[T]], bool]) – The criteria to be used for finding the particular stream.
Returns:

The particular stream being selected.

Return type:

Stream[T]

Raises:

Exception – Raised if no stream is found to satisfy the given criteria.

static sensor(obj: Any, func: Callable[[Any], T], dtype: str = None) → feed.core.base.Stream[~T][T][source]

Creates a stream from observing a value from an object.

Parameters:
  • obj (Any) – An object to observe values from.
  • func (Callable[[Any], T]) – A function to extract the data to be observed from the object being watched.
  • dtype (str, optional) – The data type of the stream.
Returns:

The stream of values being observed from the object.

Return type:

Stream[T]

static source(iterable: Iterable[T], dtype: str = None) → feed.core.base.Stream[~T][T][source]

Creates a stream from an iterable.

Parameters:
  • iterable (Iterable[T]) – The iterable to create the stream from.
  • dtype (str, optional) – The data type of the stream.
Returns:

The stream with the data type dtype created from iterable.

Return type:

Stream[T]

str

alias of feed.api.string.StringMethods

static toposort(edges: List[Tuple[feed.core.base.Stream, feed.core.base.Stream]]) → List[feed.core.base.Stream][source]

Sorts the order in which streams should be run.

Parameters:edges (List[Tuple[Stream, Stream]]) – The connections that have been found in the DAG.
Returns:The list of streams sorted with respect to the order in which they should be run.
Return type:List[Stream]
warmup(*args, **kwargs)

feed.core.feed module

class feed.core.feed.DataFeed(streams: List[feed.core.base.Stream])[source]

Bases: feed.core.base.Stream

A stream the compiles together streams to be run in an organized manner.

Parameters:streams (List[Stream]) – A list of streams to be used in the data feed.
compile() → None[source]

Compiles all the given stream together.

Organizes the order in which streams should be run to get valid output.

forward() → dict[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
next() → dict[source]
reset() → None[source]

Resets all the listeners of the stream.

run() → None[source]

Runs all the streams in processing order.

class feed.core.feed.PushFeed(streams: List[feed.core.base.Stream])[source]

Bases: feed.core.feed.DataFeed

A data feed for working with live data in an online manner.

All sources of data to be used with this feed must be a Placeholder. This ensures that the user can wait until all of their data has been loaded for the next time step.

Parameters:streams (List[Stream]) – A list of streams to be used in the data feed.
is_loaded
next() → dict[source]
push(data: dict) → dict[source]

Generates the values from the data feed based on the values being provided in data.

Parameters:data (dict) – The data to be pushed to each of the placholders in the feed.
Returns:The next data point generated from the feed based on data.
Return type:dict

feed.core.methods module

class feed.core.methods.Methods(stream: Stream)[source]

Bases: object

A class used to hold the accessor methods for a particular data type.

Parameters:stream ("Stream") – The stream to injected with the method accessor.
classmethod register_method(func: Callable, names: List[str])[source]

Injects an accessor into a specific stream instance.

Parameters:
  • func (Callable) – The function to be injected as an accessor method.
  • names (List[str]) – The names to be given to the function.

feed.core.mixins module

class feed.core.mixins.DataTypeMixin[source]

Bases: object

classmethod register_method(func: Callable, names: List[str])[source]

Injects methods into a specific stream instance.

Parameters:
  • func (Callable) – The function to be injected as a method.
  • names (List[str]) – The names to be given to the function.

feed.core.operators module

class feed.core.operators.Accumulator(func: Callable[[T, T], T], dtype: str = None)[source]

Bases: feed.core.base.Stream

An operator stream that accumulates values of a given stream.

Parameters:
  • func (Callable[[T,T], T]) – An accumulator function.
  • dtype (str) – The data type of accumulated value.
forward()[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
reset() → None[source]

Resets all the listeners of the stream.

class feed.core.operators.Apply(func: Callable[[T], K], dtype: str = None)[source]

Bases: feed.core.base.Stream

An operator stream that applies a specific function to the values of a given stream.

Parameters:
  • func (Callable[[T], …]) – A function to be applied to the values of a stream.
  • dtype (str, optional) – The data type of the values after function is applied.
forward() → K[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
class feed.core.operators.BinOp(op: Callable[[T, T], T], dtype: str = None)[source]

Bases: feed.core.base.Stream

A stream operator that combines the values of two given streams into one value of the same type.

Parameters:
  • op (Callable[[T, T], T]) – The binary operation to be applied.
  • dtype (str, optional) – The data type of the stream.
forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'bin_op'
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
class feed.core.operators.Copy[source]

Bases: feed.core.base.Stream

A stream operator that copies the values of a given stream.

forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'copy'
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
class feed.core.operators.Freeze[source]

Bases: feed.core.base.Stream

A stream operator that freezes the value of a given stream and generates that value.

forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'freeze'
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
reset() → None[source]

Resets all the listeners of the stream.

class feed.core.operators.Lag(lag: int = 1, dtype: str = None)[source]

Bases: feed.core.base.Stream

An operator stream that returns the lagged value of a given stream.

Parameters:
  • lag (int) – The number of steps to lag behind by
  • dtype (str, optional) – The data type of the stream
forward() → T[source]

Generates the next value from the underlying data streams.

Returns:The next value in the stream.
Return type:T
generic_name = 'lag'
has_next() → bool[source]

Checks if there is another value.

Returns:If there is another value or not.
Return type:bool
reset() → None[source]

Resets all the listeners of the stream.

Module contents