All Projects → keybase → pipeliner

keybase / pipeliner

Licence: BSD-3-Clause license
A simplified pipline library, for parallel requests with bounded parallelism

Programming Languages

go
31211 projects - #10 most used programming language

pipeliner

Build Status GoDoc

A simplified pipline library, for parallel requests with bounded parallelism.

Getting

go get github.com/keybase/pipeliner

Background

Often you want do network requests with bounded parallelism. Let's say you have 1,000 DNS queries to make, and don't want to wait for them to complete in serial, but don't want to blast your server with 1,000 simultaneous requests. In this case, bounded parallelism makes sense. Make 1,000 requests with only 10 outstanding at any one time.

At this point, I usually Google for it, and come up with this blog post, and I become slightly sad, because that is a lot of code to digest and understand to do something that should be rather simple. It's not really the fault of the language, but more so the library. Here is a library that makes it a lot easier:

Example

import (
	"context"
	"github.com/keybase/pipeliner"
	"sync"
	"time"
)

// See example_request_test.go for a runnable example.

type Request struct{ i int }
type Result struct{ i int }

func (r Request) Do() (Result, error) {
	time.Sleep(time.Millisecond)
	return Result{r.i}, nil
}

// makeRequests calls `Do` on all of the given requests, with only `window` outstanding
// at any given time. It puts the results in `results`, and errors out on the first
// failure.
func makeRequests(ctx context.Context, requests []Request, window int) (results []Result, err error) {

	var resultsLock sync.Mutex
	results = make([]Result, len(requests))

	pipeliner := pipeliner.NewPipeliner(window)

	worker := func(ctx context.Context, i int) error {
		res, err := requests[i].Do()
		resultsLock.Lock()
		results[i] = res
		resultsLock.Unlock()
		return err // the first error will kill the pipeline
	}

	for i := range requests {
		err := pipeliner.WaitForRoom(ctx)
		if err != nil {
			return nil, err
		}
		go func(i int) { pipeliner.CompleteOne(worker(ctx, i)) }(i)
	}
	return results, pipeliner.Flush(ctx)
}
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].