Разработка на коленке

"тут должна быть красивая цитата о программировании"

Асинхронные задачи в uwsgi и мини-фреймворк Falcon

2016-07-07 19:33

Обычно при запуске веб-проектов на python асинхронные задачи в фоне выполняются при помощи Celery. Но в этот раз у меня на сервере был uwsgi, а разворачивать celery для одной-двух задач мне не хотелось. Поэтому решил посмотреть на uwsgidecorators и его spool.

Проект на сервере разработан на django, но запускать такую большую штуку только для тестирования spool было перебором, а тут рядом была открыта страница с документацией по Falcon. Вот так у меня появился подопытный фреймворк.

Быстро написал скрипт для запуска задачи в фоне. Задача запускалась, в терминал через каждую секунду асинхронно сыпались print('hi'). После этого стал налаживать обмен данными между обработчиком запроса и задачей. Придумал "задачу", в которой обработчик запроса должен создавать в словаре хеш от даты, а потом асинхронно считать сумму кодов символов в этом хеше.

Первый пример, который должен был работать

import hashlib
import json

from datetime import datetime

import falcon

from uwsgidecorators import spool


storage = {}


@spool
def _task(arguments):
    key = arguments['key']
    storage[key] = sum(ord(ch) for ch in key)


class Handler(object):
    def on_get(self, req, resp):
        key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest()
        storage[key] = None

        _task.spool(key=key)

        resp.body = json.dumps({'storage': storage})


serv = falcon.API()
serv.add_route('/', Handler())

Запуск сервера из командной строки

uwsgi --http :9111 --wsgi-file sample.py --callable serv --master --spooler ./spool

Но пример не работал, а выдавал в браузер:

{

    "storage": {
        "77e609faa5ca12e6fc37273bf07f9308": null,
        "0cfc5e83db37eb0eedf989aa0e894f63": null,
    }

}

Закралось подозрение, что uwsgi создаёт разные объекты для _storage. Проверил id:

@spool
def _task(arguments):
    key = arguments['key']
    _storage[key] = sum(ord(ch) for ch in key)

    print('=' * 80)
    print(id(_storage))
    print(_storage)
    print('=' * 80)


class Handler(object):
    def on_get(self, req, resp):
        key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest()
        _storage[key] = None

        _task.spool(key=key)

        resp.body = json.dumps({'storage': _storage})
        print('-' * 80)
        print(id(_storage))
        print(_storage)
        print('-' * 80)

ID объектов были одинаковые.

--------------------------------------------------------------------------------
140613325993160
{'7ca17ee5c8cd055f8cebd19c0e44a393': None, '8c7e86712679f74574396e02c6af5e4e': None}
--------------------------------------------------------------------------------
================================================================================
140613325993160
{'7ca17ee5c8cd055f8cebd19c0e44a393': 2388, '8c7e86712679f74574396e02c6af5e4e': 2130}
================================================================================

Но где-то же была ошибка. Проверил номер процесса:

@spool
def _task(arguments):
    key = arguments['key']

    print('=' * 80)
    print(os.getpid())
    print('=' * 80)

    _storage[key] = sum(ord(ch) for ch in key)


class Handler(object):
    def on_get(self, req, resp):
        key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest()
        _storage[key] = None

        _task.spool(key=key)

        resp.body = json.dumps({'storage': _storage})
        print('-' * 80)
        print(os.getpid())
        print('-' * 80)

А вот номера процессов отличались. Как так получилось, что совпали айдишники объектов - не знаю. Нужно посмотреть, как конкретно запускается spooler в uwsgi.

Вот так не получилось по-быстрому сделать пример обмена данными между задачей и обработчиком запроса. Тут стоит сказать, что пересылать между ними обычные объекты не получится. Но азарт заставил задуматься над способом обмена данными.

В живом проекте для этих целей, вероятнее всего, использовалась бы база данных. Здесь выдумывать что-то другое не стал, взял sqlite, к которой с давних пор отношусь с теплотой.

Для начала разделил задачи и приложение на два отдельных файла, раз уж под них идут разные процессы.

# -*- coding: utf-8 -*-

import falcon
import hashlib
import json
import sqlite3

from datetime import datetime

from tasks import hashsum


class Handler(object):
    def __init__(self):
        self.db = sqlite3.connect('storage.sqlite3')

        cur = self.db.cursor()
        cur.execute('CREATE TABLE IF NOT EXISTS keys (key TEXT, val INTEGER)')
        self.db.commit()

    def add_key(self, key):
        cur = self.db.cursor()
        cur.execute('INSERT INTO keys (key) values(?)', (key, ))
        self.db.commit()

    def get_data(self):
        data = {}
        cur = self.db.cursor()
        for key, val in cur.execute('SELECT key, val FROM keys'):
            data[key] = val
        self.db.commit()
        return data

    def on_get(self, req, resp):
        key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest()
        self.add_key(key)

        resp.body = json.dumps({'db': self.get_data()})

        hashsum.spool(key=key)


serv = falcon.API()
serv.add_route('/', Handler())
# -*- coding: utf-8 -*-

import sqlite3

from uwsgidecorators import spool


@spool
def hashsum(arguments):
    key = arguments['key']
    val = sum(ord(ch) for ch in key)

    db = sqlite3.connect('storage.sqlite3')
    cur = db.cursor()
    cur.execute('UPDATE keys SET val=? WHERE key=?', (val, key))
    db.commit()
    db.close()

Тут я увидел, что текущая директория для процессов разная, как результат два разных файла базы данных. Нужно было выносить настройки в отдельный файл. Так появился settings.py

import os

DB_NAME = os.path.join(
    os.path.dirname(os.path.abspath(__file__)),
    'storage.sqlite3')
# -*- coding: utf-8 -*-

import sqlite3

from uwsgidecorators import spool

from settings import DB_NAME


@spool
def hashsum(arguments):
    key = arguments['key']
    val = sum(ord(ch) for ch in key)

    db = sqlite3.connect(DB_NAME)
    cur = db.cursor()
    cur.execute('UPDATE keys SET val=? WHERE key=?', (val, key))
    db.commit()
    db.close()
# -*- coding: utf-8 -*-

import falcon
import hashlib
import json
import sqlite3

from datetime import datetime

from settings import DB_NAME
from tasks import hashsum


class Handler(object):
    def __init__(self):
        self.db = sqlite3.connect(DB_NAME)

        cur = self.db.cursor()
        cur.execute('CREATE TABLE IF NOT EXISTS keys (key TEXT, val INTEGER)')
        self.db.commit()

    def add_key(self, key):
        cur = self.db.cursor()
        cur.execute('INSERT INTO keys (key) values(?)', (key, ))
        self.db.commit()

    def get_data(self):
        data = {}
        cur = self.db.cursor()
        for key, val in cur.execute('SELECT key, val FROM keys'):
            data[key] = val
        self.db.commit()
        return data

    def on_get(self, req, resp):
        key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest()
        self.add_key(key)

        resp.body = json.dumps({'db': self.get_data()})

        hashsum.spool(key=key)


serv = falcon.API()
serv.add_route('/', Handler())

Всё заработало как надо. Результат работы в браузере:

{

    "db": {
        "35f3fd25fc8abea97908b13854cfc813": 2300,
        "da90ec1480702833e9f16d6ea3d59a7a": 2243,
        "d70719f8adb446aa04accf8f12cd8984": null,
        "23a06eda818e0abae56c58498e4bb4c1": 2330
    }

}

Вот так получилось, что к приложению на мини-фреймворке пришлось прикручивать файл настроек, чтобы запустить задачу в фоне.

На самом деле, нужно посмотреть, как похожие задачи решают сами разработчики Falcon, потому что я пошёл по явно "джанговскому" пути. Просто очень много работаю с Django, вот и привык к таким его частям, как файл настроек.

comments powered by Disqus