programing

셀러리 작업을 단위 테스트하는 방법은 무엇입니까?

powerit 2023. 7. 17. 21:27
반응형

셀러리 작업을 단위 테스트하는 방법은 무엇입니까?

셀러리 문서에는 장고 내에서 셀러리를 테스트하는 방법이 언급되어 있지만 장고를 사용하지 않는 경우 셀러리 작업을 테스트하는 방법은 설명되어 있지 않습니다.이걸 어떻게 하나요?

모든 장치 테스트 lib를 사용하여 작업을 동시에 테스트할 수 있습니다.저는 보통 셀러리 작업을 할 때 두 가지 테스트를 합니다.첫 번째 것(아래에서 제안하는 바와 같이)은 완전히 동기화되어 있으며 알고리즘이 수행해야 할 작업을 수행하도록 보장하는 것이어야 합니다.두 번째 세션은 전체 시스템(브로커 포함)을 사용하며 직렬화 문제나 다른 배포, 통신 문제가 없는지 확인합니다.

그래서:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

그리고 당신의 테스트는:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

여기 제 7살짜리 대답에 대한 업데이트가 있습니다.

다음을 통해 별도의 스레드에서 작업자를 실행할 수 있습니다.pytest fixture:

https://celeryq.dev/en/v5.2.6/userguide/https.https#celeri-worker-celeri-live-worker

, 은 문에따르면다, 은사용안됩다니는서해음을서당신됩다▁use니안▁not▁according,▁should를 사용하면 안 됩니다."always_eager"(위 링크의 페이지 상단 참조).


이전 답변:

사용자:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
        ...

문서: https://docs.celeryq.dev/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER태스크를 동기화하여 실행할 수 있으므로 셀러리 서버가 필요하지 않습니다.

테스트할 항목에 따라 다릅니다.

  • 작업 코드를 직접 테스트합니다."task.delay(...)"를 부르지 마.)" 단위 테스트에서 "과제(...)"를 호출합니다.
  • CELERY_ALVERSEGER를 사용합니다.이렇게 하면 "task.delay(...)"라고 말하는 지점에서 작업이 즉시 호출되므로 전체 경로를 테스트할 수 있습니다(비동기 동작은 테스트할 수 없습니다).

셀러리 4에 있는 사람들은 다음과 같습니다.

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

설정 이름이 변경되어 업그레이드할 경우 업데이트가 필요하므로 을 참조하십시오.

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

유니트 테스트

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.시험 설비

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

부록: send_task respect eager

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

셀러리 3.0 기준, 한 가지 설정 방법CELERY_ALWAYS_EAGER장고어:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

Celery v4.0 이후, 테스트용으로만 셀러리 작업자를 시작할 수 있는 py.test fixture가 제공되며, 테스트가 완료되면 종료됩니다.

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@mymachine.local (running)>
    assert myfunc.delay().wait(3)

http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, 에 설명된 다른 고정 장치 중에서 셀러리 기본 옵션을 다시 정의하여 변경할 수 있습니다.celery_config이 방법으로 고정:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

기본적으로 테스트 작업자는 메모리 내 브로커와 결과 백엔드를 사용합니다.로컬 Redis 또는 Rabbit를 사용할 필요 없음MQ(특정 기능을 테스트하지 않는 경우)

파이테스트를 사용한 참조.

def test_add(celery_worker):
    mytask.delay()

플라스크를 사용하는 경우 앱 구성을 설정합니다.

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

그리고.conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application

저의 경우(그리고 다른 많은 경우), 제가 원했던 것은 파이테스트를 사용하여 작업의 내부 논리를 테스트하는 것이었습니다.

TL;DR; 결국 모든 것을 조롱하고 말았습니다(옵션 2)


예제 사용 사례:

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

그러나 그 이후shared_task장식가는 셀러리 내부 논리를 많이 합니다. 실제로는 단위 테스트가 아닙니다.

그래서 저에게는 두 가지 옵션이 있었습니다.

옵션 1: 별도의 내부 로직

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

이것은 매우 이상해 보이고, 덜 읽기 쉽게 만드는 것 외에, 예를 들어, 요청의 일부인 속성을 수동으로 추출하고 전달해야 합니다.task_id필요할 때를 대비해서 논리를 덜 순수하게 만듭니다.

옵션 2: 모의 실행
셀러리 내부자들을 조롱하기

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

그러면 요청 개체를 조롱할 수 있습니다(다시 말하지만, ID 또는 재시도 카운터와 같은 요청의 것이 필요할 경우).

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

이 솔루션은 훨씬 수동적이지만 셀러리 스코프를 잃지 않고 반복적으로 단위 테스트를 수행하는 데 필요한 제어 기능을 제공합니다.

많이 보여요.CELERY_ALWAYS_EAGER = true단위 테스트 방법에서는 단위 테스트를 위한 솔루션으로 사용할 수 있지만 버전 5.0.5가 사용 가능하기 때문에 대부분의 오래된 답변이 사용되지 않고 시간이 많이 소요되는 넌센스가 되는 많은 변경 사항이 있습니다. 따라서 솔루션을 검색하는 모든 사용자는 문서로 이동하여 새 버전에 대한 잘 문서화된 단위 테스트 예제를 읽어 보십시오.

https://docs.celeryproject.org/en/stable/userguide/testing.html

유닛 테스트가 포함된 Eager 모드에 대해서는 실제 문서에서 인용한 내용을 참조하십시오.

활성 모드

task_always_eager 설정으로 활성화된 eager 모드는 정의상 장치 테스트에 적합하지 않습니다.

eager 모드를 사용하여 테스트할 때 작업자에서 발생하는 작업에 대한 에뮬레이션만 테스트하고 에뮬레이션과 실제로 발생하는 작업 사이에는 많은 불일치가 있습니다.

다른 옵션은 태스크 실행의 부작용이 필요하지 않은 경우 태스크를 조롱하는 것입니다.

from unittest import mock


@mock.patch('module.module.task')
def test_name(self, mock_task): ...

언급URL : https://stackoverflow.com/questions/12078667/how-do-you-unit-test-a-celery-task

반응형