Asynchronous tests with GPars (Osoco test gallery, part 4)

If you wrote multithreaded code in Groovy/Grails, I’m sure you stumbled upon the GPars library. It simplifies parallel processing – sometimes it’s as easy as wrapping a code fragment with a GPars closure, changing the iteration method (in our example from each to eachParallel) and passing the closure to a GPars pool. The library implementation handles the executor pool creation and adds new methods to collections.

Whenever I deliberately add multithreading, I always separate responsibilities (processing logic and concurrency handling). I wrap the single-threaded class with a new one, handing the parallel execution (creating a pool, managing threads, handling cancellation, etc.):

class MultithreadedProcessorService {
    ProcessorService processorService
    def threadPoolSize = ConfigurationHolder.config.threadPool.size

    void processMultithreaded(batches) {
        GParsPool.withPool(threadPoolSize) {
            batches.eachParallel { batch ->
                processorService.process(batch)
            }
        }
    }
}

class ProcessorService {
    void process(batch) {
        // some processing
    }
}

To test the multithreaded class you can you use Spock interactions. As opposed to Groovy and Grails mocks, they are thread-safe and you won’t get any weird and unstable results:

class MultithreadedProcessorServiceSpec extends UnitSpec {
    private MultithreadedProcessorService multithreadedProcessorService
    // Using Spock mocks because they are thread-safe
    private ProcessorService processorService = Mock(ProcessorService)

    def setup() {
        mockConfig('threadPool.size=2')
        multithreadedProcessorService = new MultithreadedProcessorService(
            processorService: processorService
        )
    }

    def 'processes batches using multiple threads'() {
        given:
        def batches = [[0, 1], [2, 3, 4], [5]]
        def processedBatches = markedAsNotProcessed(batches)

        when:
        multithreadedProcessorService.processMultithreaded(batches)

        then:
        batches.size() * processorService.process({ batch ->
            processedBatches[batch] = true
            batch in batches
        })
        assertAllProcessed(processedBatches)

    }

    private markedAsNotProcessed(batches) {
        batches.inject([:]) { processed, batch ->
            processed[batch] = false
            processed
        }
    }

    private void assertAllProcessed(batches) {
        assert batches*.value == [true] * batches.size()
    }
}

This post series present the best of Osoco tests – tests that were tricky or we are just proud of. You can find a runnable source code for this test and more in the Grails Test Gallery project shared on GitHub.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos necesarios están marcados *

Puedes usar las siguientes etiquetas y atributos HTML: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>