I have recently, as part of my PhD, created a CSP (Communicating Sequential Processes) framework for Ruby. It can be found at GitHub - omegahm/emit: Communicating Sequential Processes (CSP) in Ruby. I still need to create a proper README
for that project, but what we can do with it, is communicate values between processes (in Ruby fibers) via channels. All of this can be verified to work as excepted using CSP algebra or maybe even FDR
An example of Emit code could be:
require "emit"
def producer(job_out, bagsize, bags)
bags.times { job_out << bagsize }
Emit.retire(job_out)
end
def worker(job_in, result_out)
loop do
cnt = job_in.()
sum = cnt.times.count { (rand**2 + rand**2) < 1 }
result_out << (4.0 * sum) / cnt
end
rescue Emit::ChannelRetiredException
Emit.retire(result_out)
end
def consumer(result_in)
cnt = 0
sum = result_in.()
loop do
cnt += 1
sum = (sum * cnt + result_in.()) / (cnt+1)
end
rescue Emit::ChannelRetiredException
puts sum
end
jobs = Emit.channel
results = Emit.channel
t1 = Time.now
Emit.parallel(
Emit.producer(-jobs, 1000, 10000),
10.times.map { Emit.worker(+jobs, -results) },
Emit.consumer(+results)
)
t2 = Time.now
puts "Total time elapsed = %.6fs" % (t2-t1)
This code will create 10 worker processes to each take part in the 10000 jobs of size 1000 to calculate π. Note that even though the module method is parallel
, the processes are not run in parallel in terms of multiple processes on a CPU, but rather run in parallel in terms of CSP algebra. An obvious extension to the gem is to make it possible to run the processes in parallel and only wait for communication.