Skip to content

Tasks

Container tasks

ForkTask

tasker.tasks.containers.ForkTask

The container task contains same tasks with different profiles.

Provided fields in shared will be forked to multiple fields. To process them, you should execute a MapTask instance or a ReduceTask instance.

Examples:

You can execute a ForkTask using a profile file just like the following one.

__schema__ = "tasker.tasks.containers.ForkTask"
worker = 10
reference = "example.ExampleTask"

[[tasks]]
include = false
path = ""
profile = "task1"
execute = true

[task1]
example = "Hello world 1"

[[tasks]]
include = false
path = ""
profile = "task2"
execute = true

[task2]
example = "Hello world 2"

[[tasks]]
include = false
path = ""
profile = "task3"
execute = true

[task3]
example = "Hello world 3"

define() classmethod

Examples:

__schema__ = "tasker.tasks.containers.ForkTask"
worker = 0
reference = ""
[[tasks]]
include = true
path = ""
profile = ""
execute = true

Returns:

Type Description
List[tasker.typedef.Definition]

Schema of profile

Source code in tasker/tasks/containers.py
@classmethod
def define(cls) -> List[Definition]:
    """
    Examples:
        ```toml
        __schema__ = "tasker.tasks.containers.ForkTask"
        worker = 0
        reference = ""
        [[tasks]]
        include = true
        path = ""
        profile = ""
        execute = true
        ```

    Returns:
        Schema of profile
    """
    return [
        value('worker', int),
        value('reference', str),
        value('tasks', list, [
            [
                value('include', bool),
                value('path', str),
                value('profile', str),
                value('execute', bool)
            ]
        ])
    ]

invoke(self, profile, shared, logger)

All activities of a task defined in this method. You can access configurations from profile object, access data from other tasks or provide data to other tasks by using shared. A logger is also provided with a task.

Parameters:

Name Type Description Default
profile Profile

Runtime profile defined in TOML file.

required
shared Storage

Shared storage in the whole lifecycle.

required
logger Logger

The logger named with this Task.

required

Returns:

Type Description
int

The state defined in [Return enumeration][tasker.typedef.Return].

Source code in tasker/tasks/containers.py
def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
    def execute(meta):
        task_cls = import_reference(profile.reference)
        task = task_cls()
        self._register_task(task)
        task_logger_name = f'{profile.reference}[{hex(hash(task))}]@{hex(hash(self))}'
        task_logger = get_logger(task_logger_name)
        task_shared = ForkStorageView(storage=shared.storage(), task=task)
        if meta.include:
            task_profile = Profile.from_toml(filename=meta.path)
        else:
            task_profile = profile[meta.profile]
        state = self._invoke_check(task, task_profile, task_shared, task_logger)
        return \
            state, \
            task, \
            (task_profile, task_shared, task_logger)

    def retry(state: int, task, context):
        if state | Return.RETRY.value:
            return self._invoke_check(task, *context), task, context
        else:
            return state, task, context

    pool = Pool(profile.worker)
    execute_tasks = list(filter(
        lambda meta: meta.execute,
        profile.tasks
    ))
    try:
        results = pool.map(
            func=execute,
            iterable=execute_tasks,
            chunksize=int(len(execute_tasks) // profile.worker)
        )

        while reduce(lambda t, s: t | s[0], results, Return.SUCCESS.value):
            results = pool.map(
                func=lambda args: retry(*args),
                iterable=results,
                chunksize=int(len(execute_tasks) // profile.worker)
            )

    except self.ExitSignal:
        pool.close()
        return Return.ERROR.value | Return.EXIT.value

    pool.close()
    return Return.SUCCESS.value

MapTask

tasker.tasks.containers.MapTask

The container task mapping forked shared data to new fields in shared storage.

Provided fields in shared will be mapped to new fields. You can also combine fields into dependent single field by a ReduceTask instance

define() classmethod

Examples:

__schema__ = "tasker.tasks.containers.MapTask"
worker = 0
reference = ""
include = true
path = ""
profile = ""

Returns:

Type Description
List[tasker.typedef.Definition]

Schema of profile

Source code in tasker/tasks/containers.py
@classmethod
def define(cls) -> List[Definition]:
    """
    Examples:
        ```toml
        __schema__ = "tasker.tasks.containers.MapTask"
        worker = 0
        reference = ""
        include = true
        path = ""
        profile = ""
        ```

    Returns:
        Schema of profile
    """
    return [
        value('worker', int),
        value('reference', str),
        value('include', bool),
        value('path', str),
        value('profile', str),
    ]

invoke(self, profile, shared, logger)

All activities of a task defined in this method. You can access configurations from profile object, access data from other tasks or provide data to other tasks by using shared. A logger is also provided with a task.

Parameters:

Name Type Description Default
profile Profile

Runtime profile defined in TOML file.

required
shared Storage

Shared storage in the whole lifecycle.

required
logger Logger

The logger named with this Task.

required

Returns:

Type Description
int

The state defined in [Return enumeration][tasker.typedef.Return].

Source code in tasker/tasks/containers.py
def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
    def execute(idx: Text):
        task = task_cls()
        self._register_task(task)
        task_storage = self.STORAGE_VIEW_CLASS(storage=shared.storage(), task=task, mirror=idx)
        task_logger_name = f'{profile.reference}[{hex(hash(task))}]@{hex(hash(self))}'
        task_logger = get_logger(task_logger_name)
        return self._invoke_check(task, task_profile, task_storage, task_logger), task, (
            task_profile, task_storage, task_logger
        )

    def retry(state, task, context):
        if state & Return.RETRY.value:
            return self._invoke_check(task, *context), task, context
        else:
            return state, task, context

    task_cls = import_reference(profile.reference)
    pool = Pool(profile.worker)
    if profile.include:
        task_profile = Profile.from_toml(profile.path)
    else:
        task_profile = profile[profile.profile]
    state_map = tuple(pool.map(
        func=execute,
        iterable=shared[self.STORAGE_VIEW_CLASS.META_KEY],
        chunksize=int(len(shared[self.STORAGE_VIEW_CLASS.META_KEY]) // profile.worker)
    ))
    while reduce(lambda t, s: t | s[0], state_map, Return.SUCCESS.value) & Return.RETRY.value:
        state_map = tuple(pool.map(
            func=retry,
            iterable=shared[self.STORAGE_VIEW_CLASS.META_KEY],
            chunksize=int(len(shared[self.STORAGE_VIEW_CLASS.META_KEY]) // profile.worker)
        ))
    return reduce(lambda t, s: t | s[0], state_map, Return.SUCCESS.value)

ReduceTask

tasker.tasks.containers.ReduceTask

The container task reduce sequenced fields in shared data into single dependent field.

You can reuse the key of sequenced fields when writing.

define(cls)

Examples:

__schema__ = "tasker.tasks.containers.ReduceTask"
worker = 0
reference = ""
include = true
path = ""
profile = ""

Returns:

Type Description
List[tasker.typedef.Definition]

Schema of profile

Source code in tasker/tasks/containers.py
def define(cls) -> List[Definition]:
    """
    Examples:
        ```toml
        __schema__ = "tasker.tasks.containers.ReduceTask"
        worker = 0
        reference = ""
        include = true
        path = ""
        profile = ""
        ```

    Returns:
        Schema of profile
    """
    return super(ReduceTask, cls).define()

Task

The container class of a task.

To interactive with other task, you need to implement require, provide methods to declare the fields in shared storage. To implement action of a task, you need to implement invoke method. To define the schema of task profile, you need to implement define method which override from [ProfileMixin class][tasker.mixin.ProfileMixin].

Examples:

We will introduce a example to describe the usage of Task class. For example, we construct a class named ExampleTask in "example.py", so the reference of ExampleTask is example.ExampleTask.

from logging import Logger
from typing import List, Text

from tasker import Definition, Profile, Return, value
from tasker.storage import Storage
from tasker.tasks import Task


class ExampleTask(Task):
    def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
        print('This is an example.')
        print(f'{profile.example}')
        logger.info('Example INFO')
        logger.debug('Example DEBUG')
        logger.warning('Example WARNING')
        logger.error('Example ERROR')
        shared['example'] = 'This is an example'
        return Return.SUCCESS.value

    def require(self) -> List[Text]:
        return []

    def provide(self) -> List[Text]:
        return ['example']

    def remove(self) -> List[Text]:
        return []

    def define(self) -> List[Definition]:
        return [
            value('example', str)
        ]

The profile is configured below named example.toml. When using tasker, a task must be configured with a profile file. You can create profile files separately using include feature, as well as define action of all tasks in one profile file.

__schema__ = "tasker.launcher.Launcher"
__name__ = "Example"
__author__ = "Author"
__version__ = "1.0.0"
__email__ = "author@example.com"
__abstract__ = ""
[__setting__]
    [__setting__.storage]
    reference = "tasker.storage.DictStorage"
    [__setting__.log]
    stdout = true
    level = "DEBUG"
[[__meta__]]
reference = "example.ExampleTask"
include = false
path = ""
profile = "example"
execute = true

[example]
example = "Hello world"

Run the following command.

tasker launch -f example.toml

If it run like this, your first task using tasker has been created successfully.

--------------------
Example (1.0.0)
Author: Author
E-Mail: author@example.com

--------------------

example.ExampleTask[0x2d76307e28]
require:
provide: example
------------------->
This is an example.
Hello world
2020-05-21T11:46:25|INFO|example.ExampleTask[0x2d76307e28]>Example INFO
2020-05-21T11:46:25|DEBUG|example.ExampleTask[0x2d76307e28]>Example DEBUG
2020-05-21T11:46:25|WARNING|example.ExampleTask[0x2d76307e28]>Example WARNING
2020-05-21T11:46:25|ERROR|example.ExampleTask[0x2d76307e28]>Example ERROR
<-------------------
Successfully finished in 0.000998 seconds.

Notes: The console output above is executed under version 0.2.10. Console output of version 0.4.0 has a large modification.

invoke(self, profile, shared, logger)

All activities of a task defined in this method. You can access configurations from profile object, access data from other tasks or provide data to other tasks by using shared. A logger is also provided with a task.

Parameters:

Name Type Description Default
profile Profile

Runtime profile defined in TOML file.

required
shared Storage

Shared storage in the whole lifecycle.

required
logger Logger

The logger named with this Task.

required

Returns:

Type Description
int

The state defined in [Return enumeration][tasker.typedef.Return].

Source code in tasker/tasks/containers.py
@abstractmethod
def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
    """
    All activities of a task defined in this method.
    You can access configurations from profile object,
    access data from other tasks or provide data to other tasks by
    using shared. A logger is also provided with a task.
    Args:
        profile: Runtime profile defined in TOML file.
        shared: Shared storage in the whole lifecycle.
        logger: The logger named with this Task.
    Returns:
        The state defined in [`Return` enumeration][tasker.typedef.Return].
    """
    raise NotImplementedError('Please implement the task process.')

provide(self)

Declare the keys provided by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task provides.

Warnings

If you want access any key from shared, please declare them in this method. Writing a key without declaration in provide will be forbidden.

Source code in tasker/tasks/containers.py
@abstractmethod
def provide(self) -> List[Text]:
    """
    Declare the keys provided by this task.
    Returns:
        A list contains all keys which your task provides.
    Warnings:
        If you want access any key from shared, please declare them in this method.
        Writing a key without declaration in `provide` will be forbidden.
    """
    raise NotImplementedError('Please define provided keys.')

remove(self)

Declare the keys removed by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task removes.

Warnings

If you want access any key from shared, please declare them in this method. Deleting a key without declaration in remove will be forbidden.

Source code in tasker/tasks/containers.py
@abstractmethod
def remove(self) -> List[Text]:
    """
    Declare the keys removed by this task.
    Returns:
        A list contains all keys which your task removes.
    Warnings:
        If you want access any key from shared, please declare them in this method.
        Deleting a key without declaration in `remove` will be forbidden.
    """
    raise NotImplementedError('Please define removed keys.')

require(self)

Declare the keys required by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task requires.

Warnings

If you want access any key from shared, please declare them in this method. Reading a key without declaration in require will be forbidden.

Source code in tasker/tasks/containers.py
@abstractmethod
def require(self) -> List[Text]:
    """
    Declare the keys required by this task.
    Returns:
        A list contains all keys which your task requires.
    Warnings:
        If you want access any key from shared, please declare them in this method.
        Reading a key without declaration in `require` will be forbidden.
    """
    raise NotImplementedError('Please define required keys.')

Utility tasks

SetEnvironmentTask

tasker.tasks.utils.SetEnvironmentTask

Task to set environment values.

define() classmethod

SetEnvironmentTask will not generate any template profiles. Each environment values can be filled as key-value pairs.

Returns:

Type Description
List[tasker.typedef.Definition]

Schema of profile.

Examples:

Set environment value of CUDA_VISIBLE_DEVICES to configure valid GPUs.

"CUDA_VISIBLE_DEVICES" = "0,2,4"

Source code in tasker/tasks/utils.py
@classmethod
def define(cls) -> List[Definition]:
    """
    SetEnvironmentTask will not generate any template profiles.
    Each environment values can be filled as key-value pairs.

    Returns:
        Schema of profile.
    Examples:
        Set environment value of `CUDA_VISIBLE_DEVICES` to configure valid GPUs.
        ```toml
        "CUDA_VISIBLE_DEVICES" = "0,2,4"
        ```
    """
    return []

invoke(self, profile, shared, logger)

Set environment values.

Parameters:

Name Type Description Default
profile Profile

Runtime profile defined in TOML file.

required
shared Storage

Shared storage in the whole lifecycle.

required
logger Logger

The logger named with this Task.

required

Returns:

Type Description
int

Always return [SUCCESS][tasker.typedef.Return.SUCCESS].

Source code in tasker/tasks/utils.py
def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
    """
    Set environment values.
    Args:
        profile: Runtime profile defined in TOML file.
        shared: Shared storage in the whole lifecycle.
        logger: The logger named with this Task.

    Returns:
        Always return [SUCCESS][tasker.typedef.Return.SUCCESS].
    """
    for name, content in profile.items():
        environ[name] = content
    logger.debug(f'Environment:')
    for item in environ.items():
        logger.debug(f'  {"=".join(item)}')
    return Return.SUCCESS.value

provide(self)

This task provides nothing.

Returns:

Type Description
List[str]

Nothing

Source code in tasker/tasks/utils.py
def provide(self) -> List[Text]:
    """
    This task provides nothing.

    Returns:
        Nothing
    """
    return []

remove(self)

This task removes nothing.

Returns:

Type Description
List[str]

Nothing

Source code in tasker/tasks/utils.py
def remove(self) -> List[Text]:
    """
    This task removes nothing.

    Returns:
        Nothing
    """
    return []

require(self)

This task requires nothing.

Returns:

Type Description
List[str]

Nothing

Source code in tasker/tasks/utils.py
def require(self) -> List[Text]:
    """
    This task requires nothing.

    Returns:
        Nothing
    """
    return []

StorageKeyCopyTask

tasker.tasks.utils.StorageKeyCopyTask

Move a key to another key.

invoke(self, profile, shared, logger)

All activities of a task defined in this method. You can access configurations from profile object, access data from other tasks or provide data to other tasks by using shared. A logger is also provided with a task.

Parameters:

Name Type Description Default
profile Profile

Runtime profile defined in TOML file.

required
shared Storage

Shared storage in the whole lifecycle.

required
logger Logger

The logger named with this Task.

required

Returns:

Type Description
int

The state defined in [Return enumeration][tasker.typedef.Return].

Source code in tasker/tasks/utils.py
def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
    shared[self.KEY_TO] = shared[self.KEY_FROM]
    return Return.SUCCESS

provide(self)

Declare the keys provided by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task provides.

Warnings

If you want access any key from shared, please declare them in this method. Writing a key without declaration in provide will be forbidden.

Source code in tasker/tasks/utils.py
def provide(self) -> List[Text]:
    return [self.KEY_FROM, self.KEY_TO]

remove(self)

Declare the keys removed by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task removes.

Warnings

If you want access any key from shared, please declare them in this method. Deleting a key without declaration in remove will be forbidden.

Source code in tasker/tasks/utils.py
def remove(self) -> List[Text]:
    return [self.KEY_FROM]

require(self)

Declare the keys required by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task requires.

Warnings

If you want access any key from shared, please declare them in this method. Reading a key without declaration in require will be forbidden.

Source code in tasker/tasks/utils.py
def require(self) -> List[Text]:
    return [self.KEY_FROM, self.KEY_TO]

StorageKeyMoveTask

tasker.tasks.utils.StorageKeyMoveTask

Move a key to another key.

invoke(self, profile, shared, logger)

All activities of a task defined in this method. You can access configurations from profile object, access data from other tasks or provide data to other tasks by using shared. A logger is also provided with a task.

Parameters:

Name Type Description Default
profile Profile

Runtime profile defined in TOML file.

required
shared Storage

Shared storage in the whole lifecycle.

required
logger Logger

The logger named with this Task.

required

Returns:

Type Description
int

The state defined in [Return enumeration][tasker.typedef.Return].

Source code in tasker/tasks/utils.py
def invoke(self, profile: Profile, shared: Storage, logger: Logger) -> int:
    shared[self.KEY_TO] = shared[self.KEY_FROM]
    del shared[self.KEY_FROM]
    return Return.SUCCESS

provide(self)

Declare the keys provided by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task provides.

Warnings

If you want access any key from shared, please declare them in this method. Writing a key without declaration in provide will be forbidden.

Source code in tasker/tasks/utils.py
def provide(self) -> List[Text]:
    return [self.KEY_FROM, self.KEY_TO]

remove(self)

Declare the keys removed by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task removes.

Warnings

If you want access any key from shared, please declare them in this method. Deleting a key without declaration in remove will be forbidden.

Source code in tasker/tasks/utils.py
def remove(self) -> List[Text]:
    return [self.KEY_FROM]

require(self)

Declare the keys required by this task.

Returns:

Type Description
List[str]

A list contains all keys which your task requires.

Warnings

If you want access any key from shared, please declare them in this method. Reading a key without declaration in require will be forbidden.

Source code in tasker/tasks/utils.py
def require(self) -> List[Text]:
    return [self.KEY_FROM, self.KEY_TO]