воскресенье, 7 ноября 2010 г.

Именованные каналы UNIX и синхронизация процессов

Почему-то не нашел примеров использования именованных каналов для синхронизации процессов. С их помощью можно легко написать скрипт для распараллеливания задач - просто на bash (например, для кодирования видео или архивации большого кол-ва файлов в несколько потоков, чтобы использовать все имеющиеся в наличии ядра CPU)!
В демонстрационных целях распараллеливать буду очередь из 10 команд sleep <случайное число> :))

#!/bin/bash

PipeFile="./task4all.pipe"
LogFile="./task4all.log"
MaxParallelTasks=3
TaskList=$(for i in {1..10} ; do echo $((RANDOM%12)) ; done); 

ParallelTasks=0

#Проверяем, нет ли уже созданного именованного канала, если нет - создаем
[ -p $PipeFile ] || mkfifo $PipeFile

#Основной цикл по задачам
for task in $TaskList ; do
#Если максимальное число потоков достигнуто - ждем завершения одного или нескольких
#из работающих процессов (или просто читаем сообщения о завершении работы,
#которые могли накопиться в канале).
 if [ $ParallelTasks -eq $MaxParallelTasks ] ; then
  taskscompleted=$(wc -l $PipeFile | cut -d' ' -f 1)
  ParallelTasks=$(($ParallelTasks-$taskscompleted))
 fi
#Увеличиваем число работающих потоков на 1 и стартуем работу в новом экземпляре shell.
#По завершении работы, порожденный shell запишет сообщение об этом в именованный канал
#и дождется прочтения этого сообщения родительским скриптом.
 ((ParallelTasks++))
 echo "start:$(date +%d%m%Y.%H%M%S):$task" >> $LogFile &&
 sleep $task &&
 echo "finish:$(date +%d%m%Y.%H%M%S):$task" | tee -a $PipeFile >> $LogFile &
done
echo "$(date +%d%m%Y.%H%M%S):All tasks started!" | tee -a $LogFile

#Ждем завершения последних рабочих процессов
while [ ! $(grep -c 'start:' $LogFile) -eq $(grep -c 'finish:' $LogFile) ] ; do
 cat ./task4all.pipe > /dev/null
done

echo "$(date +%d%m%Y.%H%M%S):All tasks finished!" | tee -a $LogFile

#Удаляем канал
rm $PipeFile

Подробная информация об именованных каналах в UNIX:
http://developers.sun.com/solaris/articles/named_pipes.html
Особенно рекомендуется 10 строк раздела "Reading From and Writing to a Named Pipe"
Там содержатся основные замечания, которые надо иметь ввиду, используя named pipes для синхронизации.

2 комментария:

Анонимный комментирует...

только у меня не работает? и неудивительно, т.к. wc -l собирает всю очередь в канале. Т.е. в итоге все задачи запускаются без ожидания завершения предыдущих внутри разрешенного максимума

Анонимный комментирует...

учи матчасть. не знаю что там не правильно, вот стопудов рабочий вариант


#!/bin/bash

FinishedTasksPipe="./task4all.pipe"
LogFile="./task4all.log"
TasksTotal=10
MaxParallelTasks=3
ParallelTasks=0

#TasksList=$(while [ $(( TasksTotal-- )) -ne 0 ];do echo $((RANDOM%12)); done)
TasksList=$(while [ $(( TasksTotal-- )) -ne 0 ];do echo "$(dd if=/dev/urandom bs=512 count=1 2>/dev/null | strings | wc -c)"; done)

[ -p $FinishedTasksPipe ] || mkfifo $FinishedTasksPipe
trap "rm -v $FinishedTasksPipe; truncate -s 0 $LogFile" EXIT

for i in $TasksList; do
echo "[$(date +'%M:%S')] t$i started (running:$ParallelTasks)" | tee -a $LogFile && sleep ${i}s &&
echo "[$(date +'%M:%S')] t$i finished" | tee -a $FinishedTasksPipe >> $LogFile &
[ $((++ParallelTasks)) -ge $MaxParallelTasks ]&& {
echo "[$(date +'%M:%S')]queue bank is full ($ParallelTasks). wait till at least one task to be completed..." | tee -a $LogFile
taskscompleted=$(wc -l $FinishedTasksPipe| cut -d" " -f 1)
echo "[$(date +'%M:%S')] released $taskscompleted tasks from pipe finished queue" >> $LogFile
(( ParallelTasks-=taskscompleted ))
}
done

echo "[$(date +'%M:%S')] All tasks started! Soon they will be finished" | tee -a $LogFile


echo waiting till all tasks will be finished..
while [ ! $(egrep -c 't[0-9]+ started' $LogFile) -eq $(grep -Ec 't[0-9]+ finished' $LogFile) ] ; do
cat $FinishedTasksPipe
done