Bash Mkfifo Non Blocking Assignment

I sometimes sit with a bunch of output fifos from programs that run in parallel. I would like to merge these fifos. The naïve solution is:

But this requires the first fifo to complete before reading the first byte from the second fifo, and this will block the parallel running programs.

Another way is:

But this may mix the output thus getting half-lines in output.

When reading from multiple fifos, there must be some rules for merging the files. Typically doing it on a line by line basis is enough for me, so I am looking for something that does:

which will read from all fifos in parallel and merge the output on with a full line at a time.

I can see it is not hard to write that program. All you need to do is:

  1. open all fifos
  2. do a blocking select on all of them
  3. read nonblocking from the fifo which has data into the buffer for that fifo
  4. if the buffer contains a full line (or record) then print out the line
  5. if all fifos are closed/eof: exit
  6. goto 2

So my question is not: can it be done?

My question is: Is it done already and can I just install a tool that does this?

mergecatfifoparallel-processing

Redis provides multiple data structures, and lists are one of them. For lists, it provides blocking and non-blocking operations that act on the head of the list as well as the tail of the list. Using these primitives, we're going to implement the reliable queue pattern8 according to the Redis documentation. I've also bumped into an error about connections 9 but was able to overcome it.

So in this instance we have 2 queues, q1 and q2. The producer puts messages on q1, the consumer atomically pops an element off of q1 and puts it onto q2. After it's been processed by the consumer, the message is deleted from q2. The second queue (q2) is used to recover from failures(network problems or consumer crashes). If messages are equipped with a timestamp, then their age can be measured, and if they sit in q2 for too much time, they can be transferred back to q1 to be re-processed again (by a consumer).

Flowchart

Producer

#!/bin/bashREDIS_CLI="redis-cli -h 127.0.0.1"n=1 nmax=1000 q1="queue"q2="processing"clean() { echo"DEL $q1" | $REDIS_CLIecho"DEL $q2" | $REDIS_CLI } produce() { while (($n <= $nmax)); doMSG="message $n"echo"LPUSH $q1 \"$MSG\"" | $REDIS_CLIn=$((n+1)) done } clean produce

Consumer

#!/bin/bashREDIS_CLI="redis-cli -h 127.0.0.1"q1="queue"q2="processing"# redis nil replynil=$(echo -n -e '\r\n') consume() { while true; do# move message to processing queueMSG=$(echo"RPOPLPUSH $q1 $q2" | $REDIS_CLI) if [[ -z "$MSG" ]]; thenbreakfi# processing messageecho"$MSG"# remove message from processing queueecho"LREM $q2 1 \"$MSG\"" | $REDIS_CLI >/dev/null done } consume

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *