Source code for pytest_reana.fixtures

# -*- coding: utf-8 -*-
# This file is part of REANA.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Pytest fixtures for REANA."""

from __future__ import absolute_import, print_function

import base64
import json
import os
import shutil
from uuid import uuid4

import pkg_resources
import pytest
from kombu import Connection, Exchange, Queue
from kubernetes import client
from mock import Mock, patch
from reana_commons.consumer import BaseConsumer
from reana_db.utils import build_workspace_path
from sqlalchemy import create_engine
from sqlalchemy.schema import CreateSchema
from sqlalchemy_utils import create_database, database_exists

[docs]@pytest.yield_fixture(scope="module") def tmp_shared_volume_path(tmpdir_factory): """Fixture temporary file system database. Scope: module This fixture offers a temporary shared file system using ``tmpdir_factory.mktemp`` named ``reana`` and when scoped is finished it will be deleted. .. code-block:: python import os def test_dir_exists(tmp_shared_volume_path): path = os.path.join( tmp_shared_volume_path, 'directory_path') assert os.path.exists(path) """ shared_volume_path = os.getenv("SHARED_VOLUME_PATH", "") temp_path = None if not os.path.exists(shared_volume_path): temp_path = str(tmpdir_factory.mktemp("reana")) yield temp_path or shared_volume_path if temp_path: shutil.rmtree(temp_path)
[docs]@pytest.fixture() def session(): """Create a SQL Alchemy session. Scope: function This fixture offers a SQLAlchemy session which has been created from the ``db_engine`` fixture. .. code-block:: python from reana_db.models import Workflow def test_create_workflow(session): workflow = Workflow(...) session.add(workflow) session.commit() """ from reana_db.database import Session yield Session Session.close()
[docs]@pytest.fixture() def app(base_app): """Flask application fixture. Scope: function This fixture offers a Flask application with already a database connection and all the models created. When finished it will delete all models. .. code-block:: python def create_ninja_turtle() with app.test_client() as client: somedata = 'ninja turtle' res ='api.create_object'), content_type='application/json', data=json.dumps(somedata)) assert res.status_code == 200 """ from reana_db.database import Session from reana_db.models import Base, Resource engine = create_engine(base_app.config["SQLALCHEMY_DATABASE_URI"]) base_app.session.bind = engine with base_app.app_context(): if not engine.dialect.has_schema(engine, "__reana"): engine.execute(CreateSchema("__reana")) if not database_exists(engine.url): create_database(engine.url) Base.metadata.create_all(bind=engine) Resource.initialise_default_resources() yield base_app Session.close() # close hanging connections Base.metadata.drop_all(bind=engine)
[docs]@pytest.fixture() def default_user(app, session): """Create users. Scope: function This fixture creates an user with a default UUID ``00000000-0000-0000-0000-000000000000``, ``email`` `` and ``access_token`` ``secretkey`` and returns it. .. code-block:: python def test_default_user_exists(default) with app.test_client() as client: res ='api.get_users'), query_string={"user": default_user.id_}) assert res.status_code == 200 """ from reana_db.models import User default_user_id = "00000000-0000-0000-0000-000000000000" user = User.query.filter_by(id_=default_user_id).first() if not user: with patch("reana_db.database.Session", return_value=session): user = User( id_=default_user_id, email="", access_token="secretkey" ) session.add(user) session.commit() return user
[docs]@pytest.fixture() def serial_workflow(): """Create a serial workflow. Scope: function This fixture provides a ``serial`` workflow. """ return { "reana_specification": { "workflow": { "specification": { "steps": [ { "environment": "", "commands": ["echo 'Hello REANA'"], } ] }, "type": "serial", }, }, }
[docs]@pytest.fixture() def cwl_workflow_with_name(): """CWL workflow with name. Scope: function This fixture provides a ``CWL`` workflow with a name as a dictionary. """ return { "reana_specification": { "inputs": {"parameters": {"min_year": "1991", "max_year": "2001"}}, "workflow": { "specification": {"first": "do this", "second": "do that"}, "type": "cwl", }, "type": "cwl", }, "workflow_name": "my_test_workflow", }
[docs]@pytest.fixture() def yadage_workflow_with_name(): """Yadage workflow with name. Scope: function This fixture provides a ``yadage`` workflow with a name as a dictionary. """ return { "reana_specification": { "workflow": { "specification": {"first": "do this", "second": "do that"}, "type": "yadage", }, "inputs": {"parameters": {"min_year": "1991", "max_year": "2001"}}, "type": "yadage", }, "name": "my_test_workflow", }
[docs]@pytest.fixture() def cwl_workflow_without_name(): """CWL workflow without name. Scope: function This fixture provides a ``CWL`` workflow without a name as a dictionary. """ return { "reana_specification": { "inputs": {"parameters": {"min_year": "1991", "max_year": "2001"}}, "workflow": { "specification": {"first": "do this", "second": "do that"}, "type": "cwl", }, "type": "cwl", }, "name": "", }
[docs]@pytest.fixture() def yadage_workflow_without_name(): """Yadage workflow without name. Scope: function This fixture provides a ``yadage`` workflow without name as a dictionary. """ return { "reana_specification": { "workflow": { "specification": {"first": "do this", "second": "do that"}, "type": "yadage", }, "inputs": {"parameters": {"min_year": "1991", "max_year": "2001"}}, "type": "yadage", }, "name": "", }
[docs]@pytest.fixture() def yadage_workflow_spec_loaded(): """Nested Yadage workflow. Scope: function This fixture provides a nested ``yadage`` workflow spec loaded as dictionary. """ return { "version": "0.7.2", "inputs": { "files": ["code/foo.C", "code/bar.C"], "directories": ["workflow/yadage"], "parameters": {"foo": "foo_val", "bar": "bar_val", "baz": "baz_val"}, }, "outputs": {"files": ["fitdata/plot.png"]}, "workflow": { "type": "yadage", "file": "workflow/yadage/workflow.yaml", "specification": { "stages": [ { "name": "gendata", "dependencies": { "dependency_type": "jsonpath_ready", "expressions": ["init"], }, "scheduler": { "scheduler_type": "singlestep-stage", "parameters": [ { "key": "foo", "value": { "step": "init", "output": "foo", "expression_type": "stage-output-selector", }, }, { "key": "bar", "value": { "step": "init", "output": "bar", "expression_type": "stage-output-selector", }, }, ], "step": { "process": { "process_type": "interpolated-script-cmd", "script": "python --foo '{foo}/{bar}'", "interpreter": "sh", }, "publisher": { "publisher_type": "frompar-pub", "outputmap": {"data": "outfilename"}, }, "environment": { "environment_type": "docker-encapsulated", "image": "", "imagetag": "6.18.04", "resources": [], "envscript": "", "env": {}, "workdir": None, "par_mounts": [], }, }, }, }, { "name": "fitdata", "dependencies": { "dependency_type": "jsonpath_ready", "expressions": ["gendata"], }, "scheduler": { "scheduler_type": "singlestep-stage", "parameters": [ { "key": "baz", "value": { "step": "init", "output": "baz", "expression_type": "stage-output-selector", }, }, { "key": "bar", "value": { "step": "gendata", "output": "bar", "expression_type": "stage-output-selector", }, }, ], "step": { "process": { "process_type": "interpolated-script-cmd", "script": 'root -b -q \'("{baz}","{bar}")\'', "interpreter": "sh", }, "publisher": { "publisher_type": "frompar-pub", "outputmap": {"plot": "outfile"}, }, "environment": { "environment_type": "docker-encapsulated", "image": "", "imagetag": "6.18.04", "resources": [], "envscript": "", "env": {}, "workdir": None, "par_mounts": [], }, }, }, }, { "name": "parent_step", "dependencies": { "dependency_type": "jsonpath_ready", "expressions": [""], }, "scheduler": { "scheduler_type": "singlestep-stage", "parameters": [ { "key": "nested_foo", "value": { "step": "init", "output": "nested_foo", "expression_type": "stage-output-selector", }, }, ], "workflow": { "stages": [ { "name": "nested_step", "dependencies": { "dependency_type": "jsonpath_ready", "expressions": ["run_mc"], }, "scheduler": { "scheduler_type": "singlestep-stage", "parameters": [ { "key": "nested_foo", "value": { "step": "init", "output": "nested_foo", }, }, { "key": "inputs", "value": { "stages": "run_mc[*].mergeallvars", "output": "mergedfile", "expression_type": "stage-output-selector", }, }, { "key": "mergedfile", "value": "{workdir}/merged.root", }, ], "step": { "process": { "process_type": "interpolated-script-cmd", "interpreter": "bash", "script": "source /usr/local/bin/{nested_foo}.sh\nhadd {mergedfile} {inputs}\n", }, "environment": { "environment_type": "docker-encapsulated", "image": "", "imagetag": "6.18.04", "resources": [], "envscript": "", "env": {}, "workdir": None, "par_mounts": [], }, "publisher": { "publisher_type": "frompar-pub", "outputmap": { "mergedfile": "mergedfile" }, }, }, }, } ] }, }, }, ] }, }, }
[docs]@pytest.fixture() def snakemake_workflow_spec_loaded(): """Scatter-gather Snakemake workflow. Scope: function This fixture provides the internal representation of a basic scatter-gather ``snakemake`` workflow loaded as dictionary. """ return { "version": "0.8.0", "workflow": { "type": "snakemake", "file": "workflow/snakemake/Snakemake", "specification": { "job_dependencies": { "all": ["gather"], "gather": ["scatterA", "scatterB"], "scatterA": [], "scatterB": [], }, "steps": [ { "commands": ["sleep 15 && mkdir -p results && touch {output}"], "environment": "python:2.7-slim", "inputs": {}, "kubernetes_memory_limit": None, "kubernetes_uid": None, "name": "scatterA", "outputs": {}, "params": {}, }, { "commands": ["sleep 30 && mkdir -p results && touch {output}"], "environment": "python:2.7-slim", "inputs": {}, "kubernetes_memory_limit": None, "kubernetes_uid": None, "name": "scatterB", "outputs": {}, "params": {}, }, { "commands": ["sleep 5 && touch {output}"], "environment": "python:2.7-slim", "inputs": {}, "kubernetes_memory_limit": None, "kubernetes_uid": None, "name": "gather", "outputs": {}, "params": {}, }, ], }, }, }
class _BaseConsumerTestIMPL(BaseConsumer): """Test implementation of a REANAConsumer class. This class is a basic implementation of a ``reana_commons.BaseConsumer`` to use while testing. There are fixtures wrapping it so it shouldn't be used directly. """ def get_consumers(self, Consumer, channel): """Sample get consumers method.""" return [ Consumer( self.queue, callbacks=[self.on_message], accept=[self.message_default_format], ) ] def on_message(self, body, message): """Sample on message method.""" message.ack()
[docs]@pytest.fixture def ConsumerBase(): """Return a class implementing a BaseConsumer. Scope: function This fixture offers a class which implements the ``reana_commons.BaseConsumer``. It will just acknowledge the received messages. """ return _BaseConsumerTestIMPL
[docs]@pytest.fixture def ConsumerBaseOnMessageMock(ConsumerBase): """Return a BaseConsumer class with ``on_message`` mocked. Scope: function This fixture offers a class which implements the ``reana_commons.BaseConsumer``. Additionally to the ``ConsumerBase`` fixture, this class has an ``on_message`` mocked method so actions like the following can be performed. .. code-block:: python def test_msg(ConsumerBaseOnMessageMock) consumer = ConsumerBaseOnMessageMock() # 1 message is published with message {'some': 'message'} expected_body = {'some': 'message'} consumer.on_message.assert_called_once_with( expected, ANY) """ with patch.object(ConsumerBase, "on_message"): yield ConsumerBase
[docs]@pytest.fixture def consume_queue(): """Provide a callable to consume a queue. Scope: function This fixture offers a function which given a ``kombu.Consumer`` will consume the messages in the queue until a certain ``limit``. If ``limit`` is not specified it will consume uninterruptedly. .. code-block:: python def test_consume_1_msg(ConsumerBase, consume_queue) consumer = ConsumerBase() # some message is published in the queue # and we want to consume only one. consume_queue(consumer, limit=1) """ def _consume_queue(consumer, limit=None): """Consume AMQP queue. :param consumer: A class:`kombu.Consumer` to consume from. :param limit: Integer which represents how many items to consume from the queue, if not specified, the consume method will run uninterruptedly. """ consumer_generator = consumer.consume(limit=limit) while True: try: next(consumer_generator) except StopIteration: # no more items to consume in the queue break return _consume_queue
[docs]@pytest.fixture(scope="session") def in_memory_queue_connection(): """In memory message queue. Scope: session This fixture offers an in memory class:`kombu.Connection` scoped to the testing session. .. code-block:: python def test_something(ConsumerBase, in_memory_queue_connection): consumer = ConsumerBase(connection=in_memory_queue_connection) # Now you have a consumer connected to an in memory queue """ return Connection("memory:///")
[docs]@pytest.fixture def default_exchange(): """Return a default class:`kombu.Exchange` created from configuration. Scope: function This fixture offers a default class:`kombu.Exchange`. """ return Exchange("test-exchange", type="direct")
[docs]@pytest.fixture def default_queue(default_exchange): """Return a default class:`kombu.Queue` created from configuration. Scope: function This fixture offers a default class:`kombu.Queue`. """ return Queue( "test-queue", exchange=default_exchange, routing_key="test-routing-key" )
[docs]@pytest.fixture def default_in_memory_producer(in_memory_queue_connection, default_exchange): """Rerturn a class:`kombu.Producer` connected to in memory queue. Scope: function This fixture offers a default class:`kombu.Producer` instantiated using the ``in_memory_queue_connection``. .. code-block:: python def test_publish_hello(default_in_memory_producer, default_queue): msg = {'hello': 'ninja turtle'} default_in_memory_producer.publish(msg, declare=[default_queue]) """ return in_memory_queue_connection.Producer( exchange=default_exchange, routing_key="test-routing-key", serializer="json" )
[docs]@pytest.fixture(scope="module") def sample_workflow_workspace(tmp_shared_volume_path): """Return the directory path of a sample workspace. Scope: module Creates a sample workspace in the shared volume path. Copies contents from the ``tests/test_workspace`` directory. """ def _create_sample_workflow_workspace(relative_workspace_path): empty_workspace = os.path.join(tmp_shared_volume_path, relative_workspace_path) if not os.path.exists(empty_workspace): os.makedirs(empty_workspace) yield empty_workspace return _create_sample_workflow_workspace
[docs]@pytest.fixture() def sample_yadage_workflow_in_db( app, default_user, session, yadage_workflow_with_name, sample_workflow_workspace, tmp_shared_volume_path, ): """Create a sample workflow in the database. Scope: function Adds a sample yadage workflow in the DB. """ from reana_db.models import Workflow workflow_id = uuid4() relative_workspace_path = build_workspace_path( default_user.id_, workflow_id, tmp_shared_volume_path ) next(sample_workflow_workspace(relative_workspace_path)) workflow = Workflow( id_=workflow_id, name="sample_serial_workflow_1", owner_id=default_user.id_, reana_specification=yadage_workflow_with_name["reana_specification"], operational_options={}, type_=yadage_workflow_with_name["reana_specification"]["workflow"]["type"], logs="", workspace_path=relative_workspace_path, ) session.add(workflow) session.commit() yield workflow for resource in workflow.resources: session.delete(resource) session.delete(workflow) session.commit()
[docs]@pytest.fixture() def sample_serial_workflow_in_db( app, default_user, session, serial_workflow, sample_workflow_workspace, tmp_shared_volume_path, ): """Create a sample workflow in the database. Scope: function Adds a sample serial workflow in the DB. """ from reana_db.models import Workflow workflow_id = uuid4() relative_workspace_path = build_workspace_path( default_user.id_, workflow_id, tmp_shared_volume_path ) next(sample_workflow_workspace(relative_workspace_path)) workflow = Workflow( id_=workflow_id, name="sample_serial_workflow_1", owner_id=default_user.id_, reana_specification=serial_workflow["reana_specification"], operational_options={}, type_=serial_workflow["reana_specification"]["workflow"]["type"], logs="", workspace_path=relative_workspace_path, ) session.add(workflow) session.commit() yield workflow for resource in workflow.resources: session.delete(resource) session.delete(workflow) session.commit()
[docs]def sample_condition_for_starting_queued_workflows(): """Sample always true condition.""" return True
[docs]def sample_condition_for_requeueing_workflows(): """Sample always false condition.""" return False
[docs]@pytest.fixture def no_db_user(): """Mock user created without using db.""" user = Mock() user.id_ = uuid4() return user
[docs]@pytest.fixture def user_secrets(): """Test user secrets dictionary.""" keytab_file = base64.b64encode(b"keytab file.") user_secrets = { "username": {"value": "reanauser", "type": "env"}, "password": {"value": "1232456", "type": "env"}, ".keytab": {"value": keytab_file, "type": "file"}, } return user_secrets
[docs]@pytest.fixture def kerberos_user_secrets(): """User secrets needed by Kerberos.""" user_secrets = { "CERN_USER": {"value": b"johndoe", "type": "env"}, "CERN_KEYTAB": {"value": b".keytab", "type": "env"}, ".keytab": {"value": b"keytab file", "type": "file"}, } for secret in user_secrets.values(): secret["value"] = base64.b64encode(secret["value"]) return user_secrets
[docs]@pytest.fixture def empty_user_secrets(): """Empty user secrets dictionary.""" return {}
[docs]@pytest.fixture def corev1_api_client_with_user_secrets(no_db_user): """Kubernetes CoreV1 api client with user secrets in K8s secret store. Scope: function Adds the CoreV1APIClient with example user secrets. """ def make_corev1_api_client_with_user_secrets(user_secrets): """Callable to return. Should be used with one of the secret store fixtures. """ corev1_api_client = Mock() metadata = client.V1ObjectMeta(name=str(no_db_user.id_)) metadata.annotations = {"secrets_types": "{}"} user_secrets_values = {} secrets_types = {} for secret_name in user_secrets: # Add type metadata to secret store secrets_types[secret_name] = user_secrets[secret_name]["type"] user_secrets_values[secret_name] = user_secrets[secret_name]["value"] metadata.annotations["secrets_types"] = json.dumps(secrets_types) k8s_secrets_store = client.V1Secret( api_version="v1", metadata=metadata, data=user_secrets_values ) corev1_api_client.read_namespaced_secret = ( lambda name, namespace: k8s_secrets_store ) return corev1_api_client return make_corev1_api_client_with_user_secrets