Просмотр поста

.
reaper
Backend

Бэкэнд будет состоять из нескольких частей:

Worker -- отвечает за сборку проектов.
Web -- REST API, написанное на фреймворке Flask
Broadcast -- приложение, написанное на Tornado, которое позволит отображать ход сборки в режиме реального времени.

Все они будут связаны между собой с помощью ZeroMQ.
Из веб приложения отсылается команда воркеру на сборку проекта.
При просмотре билда tornado будет считывать лог и отправлять его клиенту через WebSockets.
По окончанию сборки воркер сообщает об этом tornado, а то в свою очередь отправляет сообщение клиенту.

Для начала давайте создадим таблицы в БД.

$ psql buildserver

Выполняем:
CREATE TABLE "projects" (
    "id" SERIAL PRIMARY KEY,
    "name" VARCHAR(70) NOT NULL,
    "description" VARCHAR(200) NOT NULL DEFAULT '',
    "url" VARCHAR(200) NOT NULL
);

CREATE TABLE "builds" (
    "id" SERIAL PRIMARY KEY,
    "project_id" INTEGER NOT NULL REFERENCES "projects" ("id") ON DELETE RESTRICT,
    "start_date" INTEGER NOT NULL,
    "finish_date" INTEGER NOT NULL,
    "state" VARCHAR(10) NOT NULL
);


REFERENCES "projects" ("id") ON DELETE RESTRICT означает, что поле ссылается на поле id в таблице projects.

Если мы попытаемся создать билд, указав project_id, который отсутствует в таблице projects, то у нас ничего не получится.
При удалении проекта, нужно будет удалить сначала все сборки.
В ином случае postgres просто пошлёт нас куда подальше.
Подробности здесь: http://postgresql.ru.net/manua ... TS-FK

Большинство пакетов для работы с базой данных реализуют Database API Specification 2.0
https://www.python.org/dev/pep ... 0249/

Вся работа сводится к созданию подключения, получению курсора, выполнению запроса и коммита транзакции.

import psycopg2

conn = psycopg2.connect('postgresql://username:password@localhost/database')
cur = conn.cursor()
cur.execute('INSERT INTO "tablename" ("name", "desc") VALUES (%s, %s)', ('name-val', 'desc-val'))
try:
    conn.commit()
except:
    # При возникновении исключения откатываем транзакцию. 
    # В ином случае следующий запрос нельзя будет совершить.
    conn.rollback()
conn.close()



Создаём файл src/buildserver/app/repositories.py
В нём будут располагаться классы, необходимые для работы с БД.
repositories.py (+/-)

from psycopg2.extras import RealDictCursor
from time import time


class Base(object):
    __tablename__ = None
    __primary_key__ = None

    def __init__(self, connection):
        if self.__tablename__ is None:
            raise ValueError('Table name is not defined')
        self.connection = connection

    def _cursor(self):
        return self.connection.cursor(cursor_factory=RealDictCursor)

    def _insert(self, columns, values):
        # Осуществляет INSERT запрос
        cursor = self._cursor()
        placeholders = ('%s, ' * len(columns)).strip(', ')
        columns = '", "'.join(columns)
        columns = '"{}"'.format(columns)
        statement = 'INSERT INTO {table} ({cols}) VALUES({places})'.format(
            table=self.__tablename__,
            cols=columns,
            places=placeholders
        )
        cursor.execute(statement, values)
        self.connection.commit()
        # Получаем последний вставленный ID
        if self.__primary_key__:
            cursor.execute('SELECT "last_value" FROM "{}_{}_seq"'.format(self.__tablename__, self.__primary_key__))
            return cursor.fetchone()['last_value']

    def _update(self, columns, values, key, value):
        # Осуществляет UPDATE запрос
        self._cursor().execute(
            'UPDATE {table} SET "{values}" = %s WHERE {key} = %s'.format(
                table=self.__tablename__,
                values='" = %s, "'.join(columns),
                key=key
            ),
            list(values) + [value]
        )
        self.connection.commit()

    def _delete(self, key, value):
        # Осуществляет DELETE запрос
        self._cursor().execute('DELETE FROM {} WHERE "{}" = %s'.format(self.__tablename__, key), [value])
        self.connection.commit()

    def _find(self, key, value):
        # Находит запись в таблице
        cursor = self._cursor()
        cursor.execute('SELECT * FROM {} WHERE "{}" = %s'.format(self.__tablename__, key), (value, ))
        return cursor.fetchone()


class Builds(Base):
    __tablename__ = 'builds'
    __primary_key__ = 'id'

    def all(self, pid):
        # Список всех сборок для проекта
        cursor = self._cursor()
        cursor.execute(
            'SELECT * FROM {} WHERE "project_id" = %s ORDER BY "start_date" DESC'.format(self.__tablename__),
            [pid]
        )
        return cursor.fetchall()

    def start(self, project_id):
        # Создаёт билд
        columns = ('project_id', 'start_date', 'finish_date', 'state')
        values = (project_id, int(time()), 0, 'running')
        return self._insert(columns, values)

    def finish(self, bid, state):
        # Отмечает билд завершённым
        if state not in ['success', 'failed']:
            raise ValueError('Unexpected state given')
        self._update(('finish_date', 'state'), (int(time()), state), 'id', bid)

    def remove(self, pid):
        # Удаляет все сборки проекта
        self._delete('project_id', pid)

    def find(self, bid):
        # Возвращает билд
        return self._find('id', bid)


class Projects(Base):
    __tablename__ = 'projects'
    __primary_key__ = 'id'

    def all(self):
        # Возвращает список всех проектов
        cursor = self._cursor()
        cursor.execute('SELECT * FROM "{}" ORDER BY "name" ASC'.format(self.__tablename__))
        return cursor.fetchall()

    def save(self, name, description, url, pid=None):
        # Создаёт/обновляет проект
        columns = ('name', 'description', 'url')
        values = (name, description, url)
        if pid is not None:
            self._update(columns, values, 'id', pid)
        else:
            pid = self._insert(columns, values)
        return pid

    def remove(self, pid): 
        # Удаляет проект
        self._delete('id', pid)

    def find(self, pid):
        # Возвращает проект
        return self._find('id', pid)



Чтобы выполнять команды в шелле из питона, воспользуемся модулем subprocess.
Почитать о нём на русском языке можно здесь.

src/buildserver/app/cmd.py (+/-)

import os
import subprocess


class Cmd(object):
    def __init__(self, cwd, logger):
        self.log_tpl = '[{cwd}] {msg}'
        self.cwd = cwd
        self.logger = logger

    def run(self, *args):
        process = subprocess.Popen(
            args,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=self.cwd,
            shell=False,
            env=self.env
        )
        self.info('Executing: {}'.format(' '.join(args)))
        out, err = process.communicate()
        if out:
            self.info(out)
        if err:
            self.warning(err)
        return process.returncode

    @property
    def env(self):
        return os.environ.copy()

    def log(self, msg, level):
        try:
            getattr(self.logger, level)(self.log_tpl.format(cwd=os.path.basename(self.cwd), msg=msg))
        except UnicodeDecodeError:
            pass

    def info(self, msg):
        self.log(msg, 'info')

    def warning(self, msg):
        self.log(msg, 'warning')


Если вы читали материал, приведённый по ссылке выше, то никаких вопросов возникнуть не должно.

Настройки приложения будут располагаться в src/buildserver/app/settings.py:

import os

# Режим отладки
DEBUG = True
# DSN для подключения к БД
PG_DSN = 'postgresql://kilte:1234@localhost/buildserver'

# Адрес, на который завязывается ZMQ сокет для отправки сообщения о том, что нужно начать сборку проекта
TASK_NEW_PUBLISHER = 'tcp://127.0.0.1:8000'
# Адрес, на который завязывается ZMQ сокет для отправки сообщения о том, что сборка завершена
TASK_COMPLETE_PUBLISHER = 'tcp://127.0.0.1:8001'

# Адрес и порт для tornado приложения
BROADCAST = {
    'address': '127.0.0.1',
    'port': 8888
}

# Путь к корневому директорию проекта
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../'))
# Путь к логам сборок
LOGS_PATH = os.path.join(ROOT_PATH, 'logs')
# Путь к директорию, в который будут клонироваться проекты
BUILDS_PATH = os.path.join(ROOT_PATH, 'builds')


Создаём директории logs и builds в корне проекта.

Было бы неплохо иметь централизованный доступ к логам сборок.
src/buildserver/app/log.py (+/-)

import os
import logging
import shutil

from .settings import LOGS_PATH


def get_logger(name, path=None):
    # Возвращает сконфигурированный логгер
    logger = logging.getLogger(name)
    formatter = logging.Formatter('%(levelname)s [%(name)s/%(asctime)s]: %(message)s')  # Формат сообщения
    if not path:
        path = os.path.join(LOGS_PATH, '{}.log'.format(name))
    handler = logging.FileHandler(path)  # Обработчик, который будет писать сообщения в файл
    logger.addHandler(handler)
    handler.setFormatter(formatter)
    logger.setLevel(logging.INFO)  # Задаём уровень логгирования
    return logger


def get_project(pid):
    # Возвращает путь к директорию с логами для конкретного проекта
    return os.path.join(LOGS_PATH, 'builds', str(pid))


def create_project(pid):
    # Создаёт директорий для логов проекта
    path = get_project(pid)
    if not os.path.exists(path):
        os.makedirs(path)
    return path


def remove_project(pid):
    # Удаляет директорий для логов проекта
    shutil.rmtree(get_project(pid), True)


def get_build(pid, bid, open_=False):
    # Считывает лог сборки. Если значение аргумента open_ является True, вернёт объект файла
    path = os.path.join(LOGS_PATH, 'builds', str(pid), '{}.log'.format(bid))
    if os.path.exists(path):
        if open_:
            return open(path)
        with open(path) as f:
            log = f.read()
    else:
        if open_:
            raise OSError('File {} does not exists'.format(path))
        log = 'Unable to find log for this build {}'.format(path)
    return log



Теперь, чтобы можно было импортировать только что созданные модули, необходимо сказать питону, что src/buildserver/app является пакетом.
Для этого просто создаём пустой файл с именем __init__.py в этом директории.

REST API

Читаем про REST https://ru.wikipedia.org/wiki/REST
Еще можно здесь почитать: http://eax.me/rest/
Ну и это: http://habrahabr.ru/post/181988/

API приложения будет выглядеть примерно следующим образом:

HTTP Метод | URL | Описание

GET /api/v1/projects - Получить список проектов
POST /api/v1/projects - Создать проект
GET /api/v1/projects/<pid> - Получить данные конктретного проекта
PUT /api/v1/projects/<pid> - Обновить проект
DELETE /api/v1/projects/<pid> - Удалить проект

POST /api/v1/projects/<pid>/build - Начать сборку проекта
GET /api/v1/projects/<pid>/builds/<bid> - Получить информацию о сборке

Переходим к реализации.

src/buildserver/web.py (+/-)

import psycopg2
import zmq
from datetime import datetime
from functools import wraps
from werkzeug.exceptions import default_exceptions
from werkzeug.exceptions import HTTPException
from flask import Flask, g, jsonify, request, abort
from .app import log, settings
from .app.repositories import Builds, Projects


def make_app(import_name, **kwargs):
    def error_handler(ex): # Позволяет отдавать сообщения об ошибках в json формате.
        response = jsonify(message=str(ex))
        response.status_code = (ex.code if isinstance(ex, HTTPException) else 500)
        return response

    application = Flask(import_name, **kwargs)

    for code in default_exceptions.iterkeys():
        application.error_handler_spec[None][code] = error_handler

    return application


app = make_app(__name__)
app.debug = settings.DEBUG
ZMQContext = zmq.Context()


def run():
    app.run()


@app.before_request
def setup_request():
    # Соединяемся с базой и создаём экземпляры репозиториев при каждом запросе
    g.db = psycopg2.connect(settings.PG_DSN)
    g.projects = Projects(g.db)
    g.builds = Builds(g.db)


@app.teardown_request
def teardown_request(exception):
    # Закрываем соединение с базой
    db = getattr(g, 'db', None)
    if db is not None:
        db.close()


@app.route('/api/v1/projects', methods=['GET', 'POST'])
def projects():
    if request.method == 'GET':
        # Возвращаем список проектов
        return jsonify(items=g.projects.all())
    elif request.method == 'POST':
        # Создаём проект
        validate_project()
        return jsonify(id=g.projects.save(**request.json))


# Декоратор, позволяющий проверить наличие проекта в базе
def requires_project(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if 'project' in kwargs:
            project = g.projects.find(kwargs['project'])
            if not project:
                abort(404)
            kwargs['project'] = project
        # Вызываем обёрнутый контроллёр
        return func(*args, **kwargs)
    return wrapper


# Конвертирует временную метоку в человекочитаемую дату
def timestamp_to_date(timestamp, fmt='%Y-%m-%d / %H:%M:%S'):
    return datetime.fromtimestamp(timestamp).strftime(fmt)


# Форматирует сборку для отдачи её клиенту
def format_build(build_data):
    build_data['start_date'] = timestamp_to_date(build_data['start_date'])
    if build_data['finish_date']:
        build_data['finish_date'] = timestamp_to_date(build_data['finish_date'])
    else:
        build_data['finish_date'] = ''
    return build_data


@app.route('/api/v1/projects/<project>', methods=['GET', 'PUT', "DELETE"])
@requires_project
def manage_project(project):
    if request.method == 'GET':
        # Возвращает данные проекта
        return jsonify(project=project, builds=map(format_build, g.builds.all(project['id'])))
    elif request.method == 'PUT':
        # Обновляет данные проекта
        validate_project()
        g.projects.save(pid=project['id'], **request.json)
        return jsonify(id=project['id'])
    elif request.method == 'DELETE':
        # Удаляет проект
        g.builds.remove(project['id'])
        g.projects.remove(project['id'])
        log.remove_project(project['id'])
        return jsonify(id=project['id'])


def validate_project():
    # Валидация данных проекта для создания/обновления
    if not request.json:
        abort(400)
    keys = sorted(('name', 'description', 'url'))
    if sorted(request.json.keys()) != keys:
        abort(400)
    for key in keys:
        if not request.json[key]:
            abort(400)


@app.route('/api/v1/projects/<project>/build', methods=['POST'])
@requires_project
def builds(project):
    # Добавляет проект в очередь на сборку
    socket = ZMQContext.socket(getattr(zmq, 'PUSH'))
    socket.bind(settings.TASK_NEW_PUBLISHER)
    socket.send_json({'action': 'build', 'project_id': project['id']})
    return jsonify(msg='Build was added to queue')


@app.route('/api/v1/projects/<project>/builds/<bid>', methods=['GET'])
@requires_project
def get_build(project, bid):
    # Возвращает информацию о проекте и сборке
    build = g.builds.find(bid)
    if not build:
        abort(404)
    build = format_build(build)
    if build['state'] != 'running':
        build['log'] = log.get_build(project['id'], build['id'])
    return jsonify(project=project, build=build)



REST API готово. Переходим к воркеру.
Но для начала давайте немного поиграемся с pyzmq.
Создадим в корне проекта два файла producer.py и publisher.py

producer:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect('tcp://127.0.0.1:8000')

while True:
    print socket.recv_json()


publisher:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind('tcp://127.0.0.1:8000')

for i in range(0, 10):
    socket.send_json({'id': i})


Запускаем:

$ bin/py publisher.py
$ bin/py producer.py

На выходе получим:
{u'id': 0}
{u'id': 1}
{u'id': 2}
{u'id': 3}
{u'id': 4}
{u'id': 5}
{u'id': 6}
{u'id': 7}
{u'id': 8}
{u'id': 9}


Круто, не правда ли?
Удаляем publisher.py и producer.py, они нам больше не пригодятся.

Создаём воркер.

src/buildserver/worker.py (+/-)

import os
import psycopg2
import shutil
import traceback
import uuid
import zmq

from datetime import datetime
from .app import log, settings
from .app.cmd import Cmd
from .app.repositories import Builds, Projects


class Worker(object):

    def __init__(self, projects, builds, zmq_context, logger):
        self.projects = projects  # Репозиторий для работы с проектами
        self.builds = builds  # Репозиторий для работы со сборками
        self.logger = logger  # Логгер для воркера

        # ZMQ сокет, от которого будем узнавать, что нужно собрать проект
        self.listener = zmq_context.socket(getattr(zmq, 'PULL'))
        self.listener.connect(settings.TASK_NEW_PUBLISHER)
        self.logger.info('Connected to {}'.format(settings.TASK_NEW_PUBLISHER))

        # ZMQ сокет, который будет сообщать о том, что сборка проекта завершена
        self.publisher = zmq_context.socket(getattr(zmq, 'PUB'))
        self.publisher.bind(settings.TASK_COMPLETE_PUBLISHER)
        self.logger.info('Bind to {}'.format(settings.TASK_COMPLETE_PUBLISHER))

    def validate_message(self, message):
        # Валидация сообщения, пришедшего от self.listener
        if not isinstance(message, dict):
            raise ValueError('Wrong message format')
        if 'action' not in message or 'project_id' not in message:
            raise ValueError('Wrong message format')
        project = self.projects.find(message['project_id'])
        if not project:
            raise RuntimeError('Project {} does not exists'.format(message['project_id']))
        return message['action'], project

    def action_build(self, project):
        # Осуществляет сборку проекта

        # Создаём билд в базе
        build_id = self.builds.start(project['id'])
        # Получаем путь к логам проекта
        logs_path = log.create_project(project['id'])
        # Создаём логгер для текущей сборки
        l = log.get_logger('#{}-{}'.format(project['id'], build_id), os.path.join(logs_path, str(build_id) + '.log'))
        l.info('Started build for project {}'.format(build_id, project['id']))

        try:
            l.info('Cloning repository from {}...'.format(project['url']))
            # Создаём директорий, куда будет клонироваться проект
            directory = os.path.join(settings.BUILDS_PATH, str(uuid.uuid4()).replace('-', ''))
            os.makedirs(directory)
            try:
                cmd = Cmd(directory, l)
                # Клонируем проект
                code = cmd.run('git', 'clone', '--recursive', '--quiet', project['url'], '.')
                if code != 0:
                    raise RuntimeError('Cloning failed')
                # Читаем конфиг
                with open(os.path.join(directory, '.buildserver')) as config:
                    commands = config.read().strip().split('\n')
                # Выполняем команды из конфига
                for command in commands:
                    try:
                        cmd.run(*command.split(' '))
                    except Exception as exc:
                        raise RuntimeError('Exception has occurred while running command "{}": {}'.format(
                            command,
                            str(exc)
                        ))
            finally:
                # Удаляем директорий, куда был склонирован проект
                shutil.rmtree(directory)
            l.info('Build finished!')
            # Обновляем билд в базе
            self.builds.finish(build_id, 'success')
            # Отправляем сообщение по zmq о том, что сборка завершена успешно
            self.publisher.send_json({'build_id': build_id, 'state': 'success'})
        except Exception as exc:
            # Произошла какая-то ошибка, пишем её в лог.
            l.error('{}\n{}'.format(str(exc), traceback.format_exc()))
            l.error('Build failed!')
            # Обновляем билд в базе
            self.builds.finish(build_id, 'failed')
            # Отправляем сообщение по zmq о том, что сборка провалилась
            self.publisher.send_json({'build_id': build_id, 'state': 'failed'})

# Дальше всё и без комментариев должно быть понятно

    def run(self):
        while True:
            try:
                msg = self.listener.recv_json()
                self.logger.info('Received message: {}'.format(msg))
                action, project = self.validate_message(msg)
                getattr(self, 'action_{}'.format(action))(project)
            except Exception as e:
                self.logger.error('{}:\n{}'.format(str(e), traceback.format_exc()))


def run():
    connection = psycopg2.connect(settings.PG_DSN)
    Worker(
        Projects(connection),
        Builds(connection),
        zmq.Context(),
        log.get_logger('worker-{}-{}'.format(
            datetime.now().strftime('%Y-%m-%d'),
            str(uuid.uuid4()).replace('-', '')[0:10])
        )
    ).run()



Теперь необходимо сделать отображение лога сборки в режиме реального времени.
src/buildserver/broadcast.py (+/-)

import json
import signal
import time
import tornado.ioloop
import tornado.httpserver
import uuid
import zmq

from psycopg2.pool import SimpleConnectionPool
from tornado.ioloop import PeriodicCallback
from tornado.web import Application as WebApplication
from tornado.websocket import WebSocketHandler as BaseWebSocketHandler
from zmq.eventloop import ioloop as zmq_ioloop
from zmq.eventloop.zmqstream import ZMQStream
from .app import log, settings
from .app.repositories import Builds


# Пул соединений с БД. Не знаю, на сколько это решение верное, но для примера вполне норм.
pool = SimpleConnectionPool(5, 20, settings.PG_DSN)


# Запускает HTTPServer и позволяет остановить его с помощью UNIX сигналов SIGTERM и SIGINT
# https://ru.wikipedia.org/wiki/ ... IX%29
class Application(object):
    def __init__(self, app, address, port):
        self.http_server = tornado.httpserver.HTTPServer(app)
        self.address = address
        self.port = port

    def sig_handler(*args):
        tornado.ioloop.IOLoop.instance().add_callback(args[0].shutdown)

    def shutdown(self):
        self.http_server.stop()
        io_loop = tornado.ioloop.IOLoop.instance()
        io_loop.add_timeout(time.time() + 2, io_loop.stop)

    def run(self):
        self.http_server.listen(self.port, self.address)
        signal.signal(signal.SIGTERM, self.sig_handler)
        signal.signal(signal.SIGINT, self.sig_handler)
        tornado.ioloop.IOLoop.instance().start()


# Обработчик WebSocket соединений
class WebSocketHandler(BaseWebSocketHandler):
    connections = {}  # Контейнер для соединений

    def __init__(self, *args, **kwargs):
        super(WebSocketHandler, self).__init__(*args, **kwargs)
        self.sess_id = str(uuid.uuid4())  # Идентификатор соединения
        self.db = pool.getconn()  # Получаем подключение к БД из пула
        self.builds = Builds(self.db)  # Репозиторий для работы со сборками
        self.connections[self.sess_id] = {'handler': self, 'watchers': {}}  # Добавляем текущее соединение в контейнер

    def on_message(self, message):
        # Валидация сообщения
        # Сообщение должно быть вида {'action': 'some_action', 'params': {}}
        # Опция params не является обязательной
        try:
            if not message:
                raise ValueError('Empty message')
            message = json.loads(message)
            if 'action' not in message:
                raise ValueError('Action is not specified')
            params = message.get('params', {})
            if not isinstance(params, dict):
                raise ValueError('Wrong params')
        except ValueError as e:
            self.write_message({'error': str(e)})
            return

        if message['action'] == 'subscribe':
            # Подписываемся на сборку
            if 'build_id' not in params:
                raise ValueError('Wrong params')
            build = self.builds.find(params['build_id'])
            if not build:
                self.write_message({'error': 'Build #{} does not exists'.format(params['build_id'])})
                return
            if build['state'] != 'running':
                return
            # Добавляем наблюдателя, который будет слать лог клиенту
            self.connections[self.sess_id]['watchers'][build['id']] = Watcher(self, build)
        elif message['action'] == 'unsubscribe':
            # Отписываемся от всех сборок
            for build in self.connections[self.sess_id]['watchers'].keys():
                self.connections[self.sess_id]['watchers'].pop(build).stop()
        else:
            self.write_message({'error': 'Unknown action'})

    def on_close(self):
        # Незабываем вернуть соединение в пул при закрытии WebSocket соединения
        pool.putconn(self.db)
        self.db = None
        self.builds = None
        del self.connections[self.sess_id]

    @classmethod
    def on_zmq_message(cls, msg):
        # Слушаем сообщения от воркера
        msg = json.loads(msg[0])
        for connection in cls.connections.values():
            for build in connection['watchers'].keys():
                if build == msg['build_id']:
                    connection['watchers'].pop(build).stop()
                    connection['handler'].write_message({'action': 'build_finished', 'params': msg})


class Watcher(object):
    def __init__(self, handler, build):
        self.handler = handler  # WebSocket соединение
        self.log_file = log.get_build(build['project_id'], build['id'], True)  # Лог сборки
        self.timer = PeriodicCallback(self.callback, 500)  # Таймер, который выполняет коллбэк с интервалом в 500 миллисекунд
        self.timer.start()

    def stop(self):
        self.timer.stop()
        self.log_file.close()

    def callback(self):
        msg = self.log_file.read()
        if msg:
            # Шлём лог клиенту
            self.handler.write_message({'action': 'build_log', 'params': {'line': msg}})


def run():
    # Связываем tornado ioloop с zmq
    zmq_ioloop.install()

    # Подписываемся на сообщения от воркера
    context = zmq.Context()
    socket = context.socket(getattr(zmq, 'SUB'))
    socket.setsockopt(getattr(zmq, 'SUBSCRIBE'), '')
    socket.connect(settings.TASK_COMPLETE_PUBLISHER)
    stream = ZMQStream(socket)
    stream.on_recv(WebSocketHandler.on_zmq_message)
    
    app = Application(
        WebApplication([(r"/broadcast/", WebSocketHandler)]),
        settings.BROADCAST['address'],
        settings.BROADCAST['port']
    )
    app.run()



Ну вот, как-то так.
Остаётся сделать фронтенд и научиться запускать всё это дело.