По следам прошлого поста о функции Map.

Вам дали задачу – смоделировать процесс приготовлении картошки. Имеем набор картофелин. Каждая должна пройти стадию чистки, мойки, резки и варки.

Не вопрос, отвечет программист:

class Potato:

    def clean(self):
        pass

    def wash(self):
        pass

    def cut(self):
        pass

    def cook(self):
        pass

potatos = [Potato(), Potato(), ...]

for p in potatos:
    p.clean()
    p.wash()
    p.cut()
    p.cook()

Вроде ОК? Нет, все развалится.

Сперва попросят добавить временные задержки. Чистка – 10 секунд, мойка и резка – 5 секунд, варка – 15 минут (900 секунд). Впишем time.sleep(N) в нужные методы.

    ...
    def clean(self):
        time.sleep(10)
    ...

Запустим код и убедимся, что 10 картошек варятся 150 минут! Конечно, ведь каждая картошка обрабатывается отдельно. Мы варим картошку поштучно. Так никто не делает.

Выходит, варить картошку в цикле нельзя. Должен быть особый код, который сварит картошки вне цикла. Класс картофелины не должен знать о технике варки. Значет, будет некий менеджер CookManager.cook(potatos).

Теперь скажут, что картошку готовы чистить два человека. В цикле выше это никак не предусмотрено. Придется заворачивать цикл в функцию и передавать часть списка:

def process(potatos):
    for p in potatos:
        ...

potatos = [Potato(), Potato(), ...]

process(potatos[:n])
process(potatos[n:])

CookManager.cook(potatos)

Поделили. Но код никак не учитывает, что в процессе чистки может подключиться еще один человек. Или наоборот, уйти. Перестроить списки в процессе невозможно.

Есть проблема серьезней: процесс мойки делить между людьми нельзя. Потому что мойка одна. Это значит, нужно выдергивать из цикла операцию p.wash(), и как-то все перетасовать. И не забыть про варку в конце.

Короче, я к тому, что нужно сразу делать правильно:

class Potato:
    ...

potatos = [...]

map(Potato.clean, potatos)
map(Potato.wash, potatos)
map(Potato.cut, potatos)
map(Potato.cook, potatos)

Правильно – значит, оставлять потенциал для абстракций. Пока что код выше технически не отличается от цикла. Картошка тоже вариться 150 минут. Но это легко исправить:

def map_threaded(func, seq):
    from multiprocessing.dummy import Pool
    pool = Pool(processes=len(seq))
    return pool.map(func, *seq)

...

map_threaded(Potato.cook, potatos)

Здесь мы определяем мап в тредах. Число тредов равно числу картошек. Это значит, все они будут готовиться параллельно, как в реальности в одной кастрюле. Питонячьи треды переключаются при time.sleep(). Засыпание равносильно IO-ожиданию. Поэтому все картошки приготовятся за 15 минут.

Теперь чистка. Это будет мап на очередях. Воркеры – люди. Конфигурация очереди никак не влияет на код, поэтому можем подключить два, три и более воркеров со своими приоритетами и весами. Захотим, отключим воркер в процессе работы. Мап будет примерно такой:

def map_queued(func, seq):
    task = celery.task(func)(<task options...>)
    res_id_list = map(task.apply_async, seq)
    return map(task.get_result, res_id_list)

map_queued(Potato.clean, potatos)

Мойку параллелить нельзя, оставляем обычный мап. Все вместе:

class Potato:
    ...

potatos = [...]

map_queued(Potato.clean, potatos)
map(Potato.wash, potatos)
map_queued(Potato.cut, potatos)
map_threaded(Potato.cook, potatos)

Смысловая часть кода осталась неизменной. Мы просто поменяли один мап на другой. Всегда можем вернуть старый мап, если что-то пойдет не так. Функции map, filter, reduce, eval, apply – фундамент абстракций. Абстракции делают код открытым к изменеиям.

В идеале следует избавиться от классов, чтобы не хранить состояние картошки. Заменить методы на функции и пробрасывать результат предыдущего мапа в следующий.