코딩관계론

사용자의 Custom 설정을 지원하는 비동기 프로그램(feat.Celery) 본문

개발/SPOT

사용자의 Custom 설정을 지원하는 비동기 프로그램(feat.Celery)

개발자_티모 2023. 4. 19. 01:50
반응형

소개

안녕하세요! 이번에는 Celery를 사용한 비동기 프로그래밍에 대해 알아보겠습니다. 최근에 업데이트한 프로그램에서 Celery를 도입하여 작업을 비동기적으로 처리하게 되었는데요, 이를 통해 사용자가 프로그램을 커스텀하게 구성할 수 있게 되었습니다.

 

우선, 비동기 프로그래밍이란 CPU-bound 작업과 I/O-bound 작업을 구분하여 처리하는 방식을 말합니다. 이를 통해 시스템의 성능을 향상시킬 수 있습니다. 이번에는 이러한 비동기 프로그래밍을 구현할 때 사용한 Celery에 대해 자세히 알아보겠습니다.

 

Celery는 Python 기반의 Task Queue 라이브러리 중 하나로, 비동기적으로 작업을 처리할 수 있습니다. 이를 통해 CPU-bound 작업과 I/O-bound 작업을 효율적으로 처리할 수 있으며, Task의 실행 시간과 주기를 유연하게 설정할 수 있고, 이러한 작업 스케줄링 기능은 크롤링, 백업, 알림 등 다양한 분야에서 활용됩니다. 따라서 저희는 이번 프로그램 업데이트에서 Celery를 도입하였습니다.

 

이번 블로그에서는 Celery를 사용한 비동기 프로그래밍의 장점과 함께, Celery를 사용하는 방법과 구현 시 주의해야 할 사항 등을 다룰 예정입니다. 

Celery 사용 방법

먼저 셀러리에 대한 기본 개념이 부족하다면 아래의 링크를 통해서 셀러리에 대해서 학습할 수 있습니다.

2023.04.18 - [TroubleShooting] - Celery필요성과 개념\

 

Celery필요성과 개념

비동기 처리의 개념과 필요성 비동기 처리는 순차적으로 처리하는 것이 아니라, 요청이 발생한 순서와 상관 없이 결과를 반환하는 방식을 말합니다. 예를 들어, 웹 어플리케이션에서 사용자의

bjwan-career.tistory.com

 

Celery 앱은 Celery Task Queue의 인스턴스를 말합니다. 샐러리 앱을 생성하는 것은 Celery 인스턴스를 만드는 것이며, 이 인스턴스는 Celery에서 제공하는 Celery 클래스를 사용하여 만들어집니다.

위의 코드에서 'sms.main'은 Celery 앱의 이름을 나타냅니다. 이 이름은 Celery에서 생성되는 Queue의 이름과도 관련이 있습니다. Queue는 Celery에서 Task를 처리하기 위해 사용되며, Queue는 이름을 가지고 있습니다. Queue의 이름은 Celery 앱의 이름과 동일하게 설정됩니다.


브로커는 Celery Task Queue에서 Task를 수행하기 위해 필요한 메시지 브로커입니다. Celery에서는 RabbitMQ, Redis, Amazon SQS, MongoDB 등 다양한 메시지 브로커를 지원합니다. 위의 코드에서는 Redis를 브로커로 사용하고 있습니다.


Backend는 Celery Task의 실행 결과를 저장하기 위한 데이터베이스입니다. Celery는 기본적으로 결과를 메모리에 저장하고 있지만, 결과를 장기간 보관해야 하는 경우에는 데이터베이스에 저장하는 것이 좋습니다. 위의 코드에서는 Redis를 Backend로 사용하고 있습니다.


따라서 위의 코드는 'sms.main'이라는 이름을 가진 Celery 앱을 생성하고, Redis를 브로커와 Backend로 사용하여 Celery Task Queue를 생성하는 것을 나타냅니다.

app = Celery('sms.main', broker='redis://redis-server:6379/0', backend='redis://redis-server:6379/0')

 

아래 코드는 Celery를 사용하여 작성된 Task 함수를 나타내는 코드입니다. 이 코드는 해당 함수를 Task로 만들기 위해 @app.task 데코레이터를 사용합니다.

 

Task 함수는 해당 함수가 실행되어야 할 때, sms.main Task Queue에 전달되고, Broker는 이 요청을 Task Worker에게 전달합니다. 이후 Task Worker는 해당 Task 함수를 실행하고, 결과 값을 반환합니다.

@app.task
def job():
    robots = util.get_all_robot()
    
    spot = smsHelper.SpotStaySmsHelper("210.97", "beta")
    
    #sms all send
    for robot in robots:
        spot = smsHelper.SpotStaySmsHelper(robot['hostname'], robot['nickname'])
        
        result = spot.check_is_dock()
        message = spot.create_message(result)
    
        spot.send_message(URL, message)

이제는 Task를 Worker가 실행할 수 있도록 등록해야 합니다. 이를 위해서는 Celery Application을 이용하여 Worker를 실행해야 합니다. 아래의 명령어는 Celery Application을 이용하여 Worker를 실행하는 것입니다.

celery -A sms.main worker --loglevel=info

 

위 명령어에서 -A 옵션은 Celery Application을 지정합니다. sms.main은 Celery Application이 정의된 모듈 이름입니다. worker는 Celery Worker를 실행하기 위한 명령어입니다.

 

Worker는 등록된 Task를 처리하기 위해 Task Queue를 구독합니다. Worker가 Task Queue를 구독하기 위해서는 broker와 task queue를 설정해야 합니다. 이 설정은 Celery Application에서 수행됩니다.

 

Worker는 Task Queue를 구독하면, 새로운 Task가 등록되면 해당 Task를 처리합니다. Task 처리는 다수의 Worker가 동시에 처리할 수 있으며, 처리된 Task는 결과값과 함께 반환됩니다.

 

Worker는 중요한 이벤트에 대한 로그를 출력하기 위해 --loglevel=info 옵션을 사용합니다. 이를 통해 Worker가 어떤 Task를 처리하는지, 얼마나 많은 Task를 처리했는지 등을 확인할 수 있습니다.

 

이제는 아래의 코드를 통해서 워커가 job 함수를 비동기적으로 실행해서 처리하는 것을 확인할 수 있습니다.

if __name__ == '__main__':
    # job()
    # 작업 추가
    result = job.apply_async(args=())

    # 작업 ID 출력
    task_id = result.id
    print(f'Task ID: {task_id}')

    # 작업 상태 출력
    while not result.ready():
        print('Task is not ready')
        time.sleep(1)
        
    print('Task is ready')
    print(f'Result: {result.get()}')

 

이제부터는 Celery를 사용하면서 겪은 어려움을 소개하겠습니다.

Q. Celery Borker에 작업만 등록해주면 Worker가 실행해주는거 아닌가요?

A. 결론적으로 아닙니다. Worker에게 Celery App의 위치를 참조할 수 있도록 설정해야합니다.
비동기 프로그래밍을 위해 Celery를 사용할 때, 작업을 Celery Broker에 등록하면 해당 작업을 처리하기 위한 Celery Worker가 등록된 작업을 가져와 처리합니다. 그러나 이때 Celery Worker는 작업을 처리하기 위해 Celery App을 사용합니다. Celery App은 Worker와 Task Queue 간의 매개체 역할을 하며, Celery Worker가 작업을 처리하기 위해서는 Celery App의 위치를 알아야 합니다.

Q. 그렇다면 한 프로젝트 안에서 모든 소스 코드가 관리되어야 하는 건가요?

A. 결론적으로 그렇지 않습니다. 우리는 도커의 볼륨이라는 좋은 시스템을 사용할 수 있습니다. 소스 코드가 물리적으로 분리되어 있더라도, 공유 볼륨을 설정하여 해당 볼륨을 통해 Celery Worker가 Celery App을 사용할 수 있습니다.

아래에는 실제로 사용한 도커 컴포즈 파일들이 있습니다. 이 방법을 사용하면 물리적으로 분리된 파일도 Celery를 통해 비동기적으로 실행할 수 있습니다.

#'x'디렉토리 하위에 존재하는 docker-compose 파일

version: '3'

services:
  smshandler:
    build:
      context: .
    container_name: smshandler
    volumes:
      - test-volume:/volume
    command: sh -c "cp -rf /app/* /volume/"

volumes:
  test-volume:
#'y'파일 하위에 존재하는 도커 컴포즈 파일   

workersms:
    build:
      context: .
    restart: unless-stopped
    container_name: celery_worker_sms
    environment:
      PYTHONPATH: /app/sms
    volumes:
      - regularservice_test-volume:/app/sms
    command: sh -c "celery -A sms.main worker --loglevel=info"

volumes:
  regularservice_test-volume:
    external:
      name: regularservice_test-volume

Q. Celery Worker가 실행하면 파일들의 import error가 발생해요.

A. Celery를 사용하는 경우, 모든 모듈이 Celery worker에서 실행됩니다. 따라서 celery를 사용하면 모듈을 로드하는 방법에 대해 주의가 필요합니다. 저는 도커 컴포즈에 PYTHONPATH를 추가하는 방식으로 이 문제를 해결했습니다.

아래는 실제로 도커 컴포즈 파일의 예제입니다.

#'y'파일 하위에 존재하는 도커 컴포즈 파일   

workersms:
    build:
      context: .
    restart: unless-stopped
    container_name: celery_worker_sms
    environment:
      PYTHONPATH: /app/sms	#이 부분입니다.
    volumes:
      - regularservice_test-volume:/app/sms
    command: sh -c "celery -A sms.main worker --loglevel=info"
반응형