From: Arnaud Rebillout Date: Mon, 16 Sep 2019 04:18:11 +0000 (+0100) Subject: Import docker.io_18.09.9+dfsg1.orig-go-events.tar.xz X-Git-Tag: archive/raspbian/18.09.9+dfsg1-5+rpi1~1^2~30^4 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=10cedece73ff9209fb1b28ee85f9d0017f40b79d;p=docker.io.git Import docker.io_18.09.9+dfsg1.orig-go-events.tar.xz [dgit import orig docker.io_18.09.9+dfsg1.orig-go-events.tar.xz] --- 10cedece73ff9209fb1b28ee85f9d0017f40b79d diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..d813af77 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,70 @@ +# Contributing to Docker open source projects + +Want to hack on go-events? Awesome! Here are instructions to get you started. + +go-events is part of the [Docker](https://www.docker.com) project, and +follows the same rules and principles. If you're already familiar with the way +Docker does things, you'll feel right at home. + +Otherwise, go read Docker's +[contributions guidelines](https://github.com/docker/docker/blob/master/CONTRIBUTING.md), +[issue triaging](https://github.com/docker/docker/blob/master/project/ISSUE-TRIAGE.md), +[review process](https://github.com/docker/docker/blob/master/project/REVIEWING.md) and +[branches and tags](https://github.com/docker/docker/blob/master/project/BRANCHES-AND-TAGS.md). + +For an in-depth description of our contribution process, visit the +contributors guide: [Understand how to contribute](https://docs.docker.com/opensource/workflow/make-a-contribution/) + +### Sign your work + +The sign-off is a simple line at the end of the explanation for the patch. Your +signature certifies that you wrote the patch or otherwise have the right to pass +it on as an open-source patch. The rules are pretty simple: if you can certify +the below (from [developercertificate.org](http://developercertificate.org/)): + +``` +Developer Certificate of Origin +Version 1.1 + +Copyright (C) 2004, 2006 The Linux Foundation and its contributors. +660 York Street, Suite 102, +San Francisco, CA 94110 USA + +Everyone is permitted to copy and distribute verbatim copies of this +license document, but changing it is not allowed. + +Developer's Certificate of Origin 1.1 + +By making a contribution to this project, I certify that: + +(a) The contribution was created in whole or in part by me and I + have the right to submit it under the open source license + indicated in the file; or + +(b) The contribution is based upon previous work that, to the best + of my knowledge, is covered under an appropriate open source + license and I have the right under that license to submit that + work with modifications, whether created in whole or in part + by me, under the same open source license (unless I am + permitted to submit under a different license), as indicated + in the file; or + +(c) The contribution was provided directly to me by some other + person who certified (a), (b) or (c) and I have not modified + it. + +(d) I understand and agree that this project and the contribution + are public and that a record of the contribution (including all + personal information I submit with it, including my sign-off) is + maintained indefinitely and may be redistributed consistent with + this project or the open source license(s) involved. +``` + +Then you just add a line to every git commit message: + + Signed-off-by: Joe Smith + +Use your real name (sorry, no pseudonyms or anonymous contributions.) + +If you set your `user.name` and `user.email` git configs, you can sign your +commit automatically with `git commit -s`. diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..6d630cf5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2016 Docker, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/MAINTAINERS b/MAINTAINERS new file mode 100644 index 00000000..e414d82e --- /dev/null +++ b/MAINTAINERS @@ -0,0 +1,46 @@ +# go-events maintainers file +# +# This file describes who runs the docker/go-events project and how. +# This is a living document - if you see something out of date or missing, speak up! +# +# It is structured to be consumable by both humans and programs. +# To extract its contents programmatically, use any TOML-compliant parser. +# +# This file is compiled into the MAINTAINERS file in docker/opensource. +# +[Org] + [Org."Core maintainers"] + people = [ + "aaronlehmann", + "aluzzardi", + "lk4d4", + "stevvooe", + ] + +[people] + +# A reference list of all people associated with the project. +# All other sections should refer to people by their canonical key +# in the people section. + + # ADD YOURSELF HERE IN ALPHABETICAL ORDER + + [people.aaronlehmann] + Name = "Aaron Lehmann" + Email = "aaron.lehmann@docker.com" + GitHub = "aaronlehmann" + + [people.aluzzardi] + Name = "Andrea Luzzardi" + Email = "al@docker.com" + GitHub = "aluzzardi" + + [people.lk4d4] + Name = "Alexander Morozov" + Email = "lk4d4@docker.com" + GitHub = "lk4d4" + + [people.stevvooe] + Name = "Stephen Day" + Email = "stephen.day@docker.com" + GitHub = "stevvooe" diff --git a/README.md b/README.md new file mode 100644 index 00000000..0acafc27 --- /dev/null +++ b/README.md @@ -0,0 +1,117 @@ +# Docker Events Package + +[![GoDoc](https://godoc.org/github.com/docker/go-events?status.svg)](https://godoc.org/github.com/docker/go-events) +[![Circle CI](https://circleci.com/gh/docker/go-events.svg?style=shield)](https://circleci.com/gh/docker/go-events) + +The Docker `events` package implements a composable event distribution package +for Go. + +Originally created to implement the [notifications in Docker Registry +2](https://github.com/docker/distribution/blob/master/docs/notifications.md), +we've found the pattern to be useful in other applications. This package is +most of the same code with slightly updated interfaces. Much of the internals +have been made available. + +## Usage + +The `events` package centers around a `Sink` type. Events are written with +calls to `Sink.Write(event Event)`. Sinks can be wired up in various +configurations to achieve interesting behavior. + +The canonical example is that employed by the +[docker/distribution/notifications](https://godoc.org/github.com/docker/distribution/notifications) +package. Let's say we have a type `httpSink` where we'd like to queue +notifications. As a rule, it should send a single http request and return an +error if it fails: + +```go +func (h *httpSink) Write(event Event) error { + p, err := json.Marshal(event) + if err != nil { + return err + } + body := bytes.NewReader(p) + resp, err := h.client.Post(h.url, "application/json", body) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.Status != 200 { + return errors.New("unexpected status") + } + + return nil +} + +// implement (*httpSink).Close() +``` + +With just that, we can start using components from this package. One can call +`(*httpSink).Write` to send events as the body of a post request to a +configured URL. + +### Retries + +HTTP can be unreliable. The first feature we'd like is to have some retry: + +```go +hs := newHTTPSink(/*...*/) +retry := NewRetryingSink(hs, NewBreaker(5, time.Second)) +``` + +We now have a sink that will retry events against the `httpSink` until they +succeed. The retry will backoff for one second after 5 consecutive failures +using the breaker strategy. + +### Queues + +This isn't quite enough. We we want a sink that doesn't block while we are +waiting for events to be sent. Let's add a `Queue`: + +```go +queue := NewQueue(retry) +``` + +Now, we have an unbounded queue that will work through all events sent with +`(*Queue).Write`. Events can be added asynchronously to the queue without +blocking the current execution path. This is ideal for use in an http request. + +### Broadcast + +It usually turns out that you want to send to more than one listener. We can +use `Broadcaster` to support this: + +```go +var broadcast = NewBroadcaster() // make it available somewhere in your application. +broadcast.Add(queue) // add your queue! +broadcast.Add(queue2) // and another! +``` + +With the above, we can now call `broadcast.Write` in our http handlers and have +all the events distributed to each queue. Because the events are queued, not +listener blocks another. + +### Extending + +For the most part, the above is sufficient for a lot of applications. However, +extending the above functionality can be done implementing your own `Sink`. The +behavior and semantics of the sink can be completely dependent on the +application requirements. The interface is provided below for reference: + +```go +type Sink { + Write(Event) error + Close() error +} +``` + +Application behavior can be controlled by how `Write` behaves. The examples +above are designed to queue the message and return as quickly as possible. +Other implementations may block until the event is committed to durable +storage. + +## Copyright and license + +Copyright © 2016 Docker, Inc. go-events is licensed under the Apache License, +Version 2.0. See [LICENSE](LICENSE) for the full license text. diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 00000000..5120078d --- /dev/null +++ b/broadcast.go @@ -0,0 +1,178 @@ +package events + +import ( + "fmt" + "sync" + + "github.com/sirupsen/logrus" +) + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan Event + adds chan configureRequest + removes chan configureRequest + + shutdown chan struct{} + closed chan struct{} + once sync.Once +} + +// NewBroadcaster appends one or more sinks to the list of sinks. The +// broadcaster behavior will be affected by the properties of the sink. +// Generally, the sink should accept all messages and deal with reliability on +// its own. Use of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan Event), + adds: make(chan configureRequest), + removes: make(chan configureRequest), + shutdown: make(chan struct{}), + closed: make(chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts an event to be dispatched to all sinks. This method will never +// fail and should never block (hopefully!). The caller cedes the memory to the +// broadcaster and should not modify it after calling write. +func (b *Broadcaster) Write(event Event) error { + select { + case b.events <- event: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Add the sink to the broadcaster. +// +// The provided sink must be comparable with equality. Typically, this just +// works with a regular pointer type. +func (b *Broadcaster) Add(sink Sink) error { + return b.configure(b.adds, sink) +} + +// Remove the provided sink. +func (b *Broadcaster) Remove(sink Sink) error { + return b.configure(b.removes, sink) +} + +type configureRequest struct { + sink Sink + response chan error +} + +func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error { + response := make(chan error, 1) + + for { + select { + case ch <- configureRequest{ + sink: sink, + response: response}: + ch = nil + case err := <-response: + return err + case <-b.closed: + return ErrSinkClosed + } + } +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + b.once.Do(func() { + close(b.shutdown) + }) + + <-b.closed + return nil +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + defer close(b.closed) + remove := func(target Sink) { + for i, sink := range b.sinks { + if sink == target { + b.sinks = append(b.sinks[:i], b.sinks[i+1:]...) + break + } + } + } + + for { + select { + case event := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(event); err != nil { + if err == ErrSinkClosed { + // remove closed sinks + remove(sink) + continue + } + logrus.WithField("event", event).WithField("events.sink", sink).WithError(err). + Errorf("broadcaster: dropping event") + } + } + case request := <-b.adds: + // while we have to iterate for add/remove, common iteration for + // send is faster against slice. + + var found bool + for _, sink := range b.sinks { + if request.sink == sink { + found = true + break + } + } + + if !found { + b.sinks = append(b.sinks, request.sink) + } + // b.sinks[request.sink] = struct{}{} + request.response <- nil + case request := <-b.removes: + remove(request.sink) + request.response <- nil + case <-b.shutdown: + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil && err != ErrSinkClosed { + logrus.WithField("events.sink", sink).WithError(err). + Errorf("broadcaster: closing sink failed") + } + } + return + } + } +} + +func (b *Broadcaster) String() string { + // Serialize copy of this broadcaster without the sync.Once, to avoid + // a data race. + + b2 := map[string]interface{}{ + "sinks": b.sinks, + "events": b.events, + "adds": b.adds, + "removes": b.removes, + + "shutdown": b.shutdown, + "closed": b.closed, + } + + return fmt.Sprint(b2) +} diff --git a/broadcast_test.go b/broadcast_test.go new file mode 100644 index 00000000..aeac9e0f --- /dev/null +++ b/broadcast_test.go @@ -0,0 +1,97 @@ +package events + +import ( + "sync" + "testing" +) + +func TestBroadcaster(t *testing.T) { + const nEvents = 1000 + var sinks []Sink + b := NewBroadcaster() + for i := 0; i < 10; i++ { + sinks = append(sinks, newTestSink(t, nEvents)) + b.Add(sinks[i]) + b.Add(sinks[i]) // noop + } + + var wg sync.WaitGroup + for i := 1; i <= nEvents; i++ { + wg.Add(1) + go func(event Event) { + if err := b.Write(event); err != nil { + t.Fatalf("error writing event %v: %v", event, err) + } + wg.Done() + }("event") + } + + wg.Wait() // Wait until writes complete + + for i := range sinks { + b.Remove(sinks[i]) + } + + // sending one more should trigger test failure if they weren't removed. + if err := b.Write("onemore"); err != nil { + t.Fatalf("unexpected error sending one more: %v", err) + } + + // add them back to test closing. + for i := range sinks { + b.Add(sinks[i]) + } + + checkClose(t, b) + + // Iterate through the sinks and check that they all have the expected length. + for _, sink := range sinks { + ts := sink.(*testSink) + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nEvents { + t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + } +} + +func BenchmarkBroadcast10(b *testing.B) { + benchmarkBroadcast(b, 10) +} + +func BenchmarkBroadcast100(b *testing.B) { + benchmarkBroadcast(b, 100) +} + +func BenchmarkBroadcast1000(b *testing.B) { + benchmarkBroadcast(b, 1000) +} + +func BenchmarkBroadcast10000(b *testing.B) { + benchmarkBroadcast(b, 10000) +} + +func benchmarkBroadcast(b *testing.B, nsinks int) { + // counter := metrics.NewCounter() + // metrics.DefaultRegistry.Register(fmt.Sprintf("nsinks: %v", nsinks), counter) + // go metrics.Log(metrics.DefaultRegistry, 500*time.Millisecond, log.New(os.Stderr, "metrics: ", log.LstdFlags)) + + b.StopTimer() + var sinks []Sink + for i := 0; i < nsinks; i++ { + // counter.Inc(1) + sinks = append(sinks, newTestSink(b, b.N)) + // sinks = append(sinks, NewQueue(&testSink{t: b, expected: b.N})) + } + b.StartTimer() + + // meter := metered{} + // NewQueue(meter.Egress(dst)) + + benchmarkSink(b, NewBroadcaster(sinks...)) +} diff --git a/channel.go b/channel.go new file mode 100644 index 00000000..802cf51f --- /dev/null +++ b/channel.go @@ -0,0 +1,61 @@ +package events + +import ( + "fmt" + "sync" +) + +// Channel provides a sink that can be listened on. The writer and channel +// listener must operate in separate goroutines. +// +// Consumers should listen on Channel.C until Closed is closed. +type Channel struct { + C chan Event + + closed chan struct{} + once sync.Once +} + +// NewChannel returns a channel. If buffer is zero, the channel is +// unbuffered. +func NewChannel(buffer int) *Channel { + return &Channel{ + C: make(chan Event, buffer), + closed: make(chan struct{}), + } +} + +// Done returns a channel that will always proceed once the sink is closed. +func (ch *Channel) Done() chan struct{} { + return ch.closed +} + +// Write the event to the channel. Must be called in a separate goroutine from +// the listener. +func (ch *Channel) Write(event Event) error { + select { + case ch.C <- event: + return nil + case <-ch.closed: + return ErrSinkClosed + } +} + +// Close the channel sink. +func (ch *Channel) Close() error { + ch.once.Do(func() { + close(ch.closed) + }) + + return nil +} + +func (ch *Channel) String() string { + // Serialize a copy of the Channel that doesn't contain the sync.Once, + // to avoid a data race. + ch2 := map[string]interface{}{ + "C": ch.C, + "closed": ch.closed, + } + return fmt.Sprint(ch2) +} diff --git a/channel_test.go b/channel_test.go new file mode 100644 index 00000000..63cc76d4 --- /dev/null +++ b/channel_test.go @@ -0,0 +1,57 @@ +package events + +import ( + "fmt" + "sync" + "testing" +) + +func TestChannel(t *testing.T) { + const nevents = 100 + + sink := NewChannel(0) + + go func() { + var wg sync.WaitGroup + for i := 1; i <= nevents; i++ { + event := "event-" + fmt.Sprint(i) + wg.Add(1) + go func(event Event) { + defer wg.Done() + if err := sink.Write(event); err != nil { + t.Fatalf("error writing event: %v", err) + } + }(event) + } + wg.Wait() + sink.Close() + + // now send another bunch of events and ensure we stay closed + for i := 1; i <= nevents; i++ { + if err := sink.Write(i); err != ErrSinkClosed { + t.Fatalf("unexpected error: %v != %v", err, ErrSinkClosed) + } + } + }() + + var received int +loop: + for { + select { + case <-sink.C: + received++ + case <-sink.Done(): + break loop + } + } + + sink.Close() + _, ok := <-sink.Done() // test will timeout if this hangs + if ok { + t.Fatalf("done should be a closed channel") + } + + if received != nevents { + t.Fatalf("events did not make it through sink: %v != %v", received, nevents) + } +} diff --git a/common_test.go b/common_test.go new file mode 100644 index 00000000..14f940bd --- /dev/null +++ b/common_test.go @@ -0,0 +1,117 @@ +package events + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" +) + +type tOrB interface { + Fatalf(format string, args ...interface{}) + Logf(format string, args ...interface{}) +} + +type testSink struct { + t tOrB + + events []Event + expected int + mu sync.Mutex + closed bool +} + +func newTestSink(t tOrB, expected int) *testSink { + return &testSink{ + t: t, + events: make([]Event, 0, expected), // pre-allocate so we aren't benching alloc + expected: expected, + } +} + +func (ts *testSink) Write(event Event) error { + ts.mu.Lock() + defer ts.mu.Unlock() + + if ts.closed { + return ErrSinkClosed + } + + ts.events = append(ts.events, event) + + if len(ts.events) > ts.expected { + ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected) + } + + return nil +} + +func (ts *testSink) Close() error { + ts.mu.Lock() + defer ts.mu.Unlock() + if ts.closed { + return ErrSinkClosed + } + + ts.closed = true + + if len(ts.events) != ts.expected { + ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected) + } + + return nil +} + +type delayedSink struct { + Sink + delay time.Duration +} + +func (ds *delayedSink) Write(event Event) error { + time.Sleep(ds.delay) + return ds.Sink.Write(event) +} + +type flakySink struct { + Sink + rate float64 + mu sync.Mutex +} + +func (fs *flakySink) Write(event Event) error { + fs.mu.Lock() + defer fs.mu.Unlock() + + if rand.Float64() < fs.rate { + return fmt.Errorf("error writing event: %v", event) + } + + return fs.Sink.Write(event) +} + +func checkClose(t *testing.T, sink Sink) { + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // second close should not crash but should return an error. + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error on double close: %v", err) + } + + // Write after closed should be an error + if err := sink.Write("fail"); err == nil { + t.Fatalf("write after closed did not have an error") + } else if err != ErrSinkClosed { + t.Fatalf("error should be ErrSinkClosed") + } +} + +func benchmarkSink(b *testing.B, sink Sink) { + defer sink.Close() + var event = "myevent" + for i := 0; i < b.N; i++ { + sink.Write(event) + } +} diff --git a/errors.go b/errors.go new file mode 100644 index 00000000..56db7c25 --- /dev/null +++ b/errors.go @@ -0,0 +1,10 @@ +package events + +import "fmt" + +var ( + // ErrSinkClosed is returned if a write is issued to a sink that has been + // closed. If encountered, the error should be considered terminal and + // retries will not be successful. + ErrSinkClosed = fmt.Errorf("events: sink closed") +) diff --git a/event.go b/event.go new file mode 100644 index 00000000..f0f1d9ea --- /dev/null +++ b/event.go @@ -0,0 +1,15 @@ +package events + +// Event marks items that can be sent as events. +type Event interface{} + +// Sink accepts and sends events. +type Sink interface { + // Write an event to the Sink. If no error is returned, the caller will + // assume that all events have been committed to the sink. If an error is + // received, the caller may retry sending the event. + Write(event Event) error + + // Close the sink, possibly waiting for pending events to flush. + Close() error +} diff --git a/filter.go b/filter.go new file mode 100644 index 00000000..e6c0eb69 --- /dev/null +++ b/filter.go @@ -0,0 +1,52 @@ +package events + +// Matcher matches events. +type Matcher interface { + Match(event Event) bool +} + +// MatcherFunc implements matcher with just a function. +type MatcherFunc func(event Event) bool + +// Match calls the wrapped function. +func (fn MatcherFunc) Match(event Event) bool { + return fn(event) +} + +// Filter provides an event sink that sends only events that are accepted by a +// Matcher. No methods on filter are goroutine safe. +type Filter struct { + dst Sink + matcher Matcher + closed bool +} + +// NewFilter returns a new filter that will send to events to dst that return +// true for Matcher. +func NewFilter(dst Sink, matcher Matcher) Sink { + return &Filter{dst: dst, matcher: matcher} +} + +// Write an event to the filter. +func (f *Filter) Write(event Event) error { + if f.closed { + return ErrSinkClosed + } + + if f.matcher.Match(event) { + return f.dst.Write(event) + } + + return nil +} + +// Close the filter and allow no more events to pass through. +func (f *Filter) Close() error { + // TODO(stevvooe): Not all sinks should have Close. + if f.closed { + return nil + } + + f.closed = true + return f.dst.Close() +} diff --git a/filter_test.go b/filter_test.go new file mode 100644 index 00000000..9f51f2db --- /dev/null +++ b/filter_test.go @@ -0,0 +1,21 @@ +package events + +import "testing" + +func TestFilter(t *testing.T) { + const nevents = 100 + ts := newTestSink(t, nevents/2) + filter := NewFilter(ts, MatcherFunc(func(event Event) bool { + i, ok := event.(int) + return ok && i%2 == 0 + })) + + for i := 0; i < nevents; i++ { + if err := filter.Write(i); err != nil { + t.Fatalf("unexpected error writing event: %v", err) + } + } + + checkClose(t, filter) + +} diff --git a/queue.go b/queue.go new file mode 100644 index 00000000..4bb770af --- /dev/null +++ b/queue.go @@ -0,0 +1,111 @@ +package events + +import ( + "container/list" + "sync" + + "github.com/sirupsen/logrus" +) + +// Queue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type Queue struct { + dst Sink + events *list.List + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// NewQueue returns a queue to the provided Sink dst. +func NewQueue(dst Sink) *Queue { + eq := Queue{ + dst: dst, + events: list.New(), + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// been closed. +func (eq *Queue) Write(event Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + eq.events.PushBack(event) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *Queue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return nil + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + return eq.dst.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *Queue) run() { + for { + event := eq.next() + + if event == nil { + return // nil block means event queue is closed. + } + + if err := eq.dst.Write(event); err != nil { + // TODO(aaronl): Dropping events could be bad depending + // on the application. We should have a way of + // communicating this condition. However, logging + // at a log level above debug may not be appropriate. + // Eventually, go-events should not use logrus at all, + // and should bubble up conditions like this through + // error values. + logrus.WithFields(logrus.Fields{ + "event": event, + "sink": eq.dst, + }).WithError(err).Debug("eventqueue: dropped event") + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *Queue) next() Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.(Event) + eq.events.Remove(front) + + return block +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 00000000..7bfe0e3d --- /dev/null +++ b/queue_test.go @@ -0,0 +1,46 @@ +package events + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestQueue(t *testing.T) { + const nevents = 1000 + + ts := newTestSink(t, nevents) + eq := NewQueue( + // delayed sync simulates destination slower than channel comms + &delayedSink{ + Sink: ts, + delay: time.Millisecond * 1, + }) + time.Sleep(10 * time.Millisecond) // let's queue settle to wait conidition. + + var wg sync.WaitGroup + for i := 1; i <= nevents; i++ { + wg.Add(1) + go func(event Event) { + if err := eq.Write(event); err != nil { + t.Fatalf("error writing event: %v", err) + } + wg.Done() + }("event-" + fmt.Sprint(i)) + } + + wg.Wait() + checkClose(t, eq) + + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } +} diff --git a/retry.go b/retry.go new file mode 100644 index 00000000..2df55d21 --- /dev/null +++ b/retry.go @@ -0,0 +1,260 @@ +package events + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" +) + +// RetryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Retry is configured with a RetryStrategy. Concurrent calls to a +// retrying sink are serialized through the sink, meaning that if one is +// in-flight, another will not proceed. +type RetryingSink struct { + sink Sink + strategy RetryStrategy + closed chan struct{} + once sync.Once +} + +// NewRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink { + rs := &RetryingSink{ + sink: sink, + strategy: strategy, + closed: make(chan struct{}), + } + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *RetryingSink) Write(event Event) error { + logger := logrus.WithField("event", event) + +retry: + select { + case <-rs.closed: + return ErrSinkClosed + default: + } + + if backoff := rs.strategy.Proceed(event); backoff > 0 { + select { + case <-time.After(backoff): + // TODO(stevvooe): This branch holds up the next try. Before, we + // would simply break to the "retry" label and then possibly wait + // again. However, this requires all retry strategies to have a + // large probability of probing the sync for success, rather than + // just backing off and sending the request. + case <-rs.closed: + return ErrSinkClosed + } + } + + if err := rs.sink.Write(event); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logger := logger.WithError(err) // shadow!! + + if rs.strategy.Failure(event, err) { + logger.Errorf("retryingsink: dropped event") + return nil + } + + logger.Errorf("retryingsink: error writing event, retrying") + goto retry + } + + rs.strategy.Success(event) + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *RetryingSink) Close() error { + rs.once.Do(func() { + close(rs.closed) + }) + + return nil +} + +func (rs *RetryingSink) String() string { + // Serialize a copy of the RetryingSink without the sync.Once, to avoid + // a data race. + rs2 := map[string]interface{}{ + "sink": rs.sink, + "strategy": rs.strategy, + "closed": rs.closed, + } + return fmt.Sprint(rs2) +} + +// RetryStrategy defines a strategy for retrying event sink writes. +// +// All methods should be goroutine safe. +type RetryStrategy interface { + // Proceed is called before every event send. If proceed returns a + // positive, non-zero integer, the retryer will back off by the provided + // duration. + // + // An event is provided, by may be ignored. + Proceed(event Event) time.Duration + + // Failure reports a failure to the strategy. If this method returns true, + // the event should be dropped. + Failure(event Event, err error) bool + + // Success should be called when an event is sent successfully. + Success(event Event) +} + +// Breaker implements a circuit breaker retry strategy. +// +// The current implementation never drops events. +type Breaker struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + mu sync.Mutex +} + +var _ RetryStrategy = &Breaker{} + +// NewBreaker returns a breaker that will backoff after the threshold has been +// tripped. A Breaker is thread safe and may be shared by many goroutines. +func NewBreaker(threshold int, backoff time.Duration) *Breaker { + return &Breaker{ + threshold: threshold, + backoff: backoff, + } +} + +// Proceed checks the failures against the threshold. +func (b *Breaker) Proceed(event Event) time.Duration { + b.mu.Lock() + defer b.mu.Unlock() + + if b.recent < b.threshold { + return 0 + } + + return b.last.Add(b.backoff).Sub(time.Now()) +} + +// Success resets the breaker. +func (b *Breaker) Success(event Event) { + b.mu.Lock() + defer b.mu.Unlock() + + b.recent = 0 + b.last = time.Time{} +} + +// Failure records the failure and latest failure time. +func (b *Breaker) Failure(event Event, err error) bool { + b.mu.Lock() + defer b.mu.Unlock() + + b.recent++ + b.last = time.Now().UTC() + return false // never drop events. +} + +var ( + // DefaultExponentialBackoffConfig provides a default configuration for + // exponential backoff. + DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ + Base: time.Second, + Factor: time.Second, + Max: 20 * time.Second, + } +) + +// ExponentialBackoffConfig configures backoff parameters. +// +// Note that these parameters operate on the upper bound for choosing a random +// value. For example, at Base=1s, a random value in [0,1s) will be chosen for +// the backoff value. +type ExponentialBackoffConfig struct { + // Base is the minimum bound for backing off after failure. + Base time.Duration + + // Factor sets the amount of time by which the backoff grows with each + // failure. + Factor time.Duration + + // Max is the absolute maxiumum bound for a single backoff. + Max time.Duration +} + +// ExponentialBackoff implements random backoff with exponentially increasing +// bounds as the number consecutive failures increase. +type ExponentialBackoff struct { + config ExponentialBackoffConfig + failures uint64 // consecutive failure counter. +} + +// NewExponentialBackoff returns an exponential backoff strategy with the +// desired config. If config is nil, the default is returned. +func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff { + return &ExponentialBackoff{ + config: config, + } +} + +// Proceed returns the next randomly bound exponential backoff time. +func (b *ExponentialBackoff) Proceed(event Event) time.Duration { + return b.backoff(atomic.LoadUint64(&b.failures)) +} + +// Success resets the failures counter. +func (b *ExponentialBackoff) Success(event Event) { + atomic.StoreUint64(&b.failures, 0) +} + +// Failure increments the failure counter. +func (b *ExponentialBackoff) Failure(event Event, err error) bool { + atomic.AddUint64(&b.failures, 1) + return false +} + +// backoff calculates the amount of time to wait based on the number of +// consecutive failures. +func (b *ExponentialBackoff) backoff(failures uint64) time.Duration { + if failures <= 0 { + // proceed normally when there are no failures. + return 0 + } + + factor := b.config.Factor + if factor <= 0 { + factor = DefaultExponentialBackoffConfig.Factor + } + + backoff := b.config.Base + factor*time.Duration(1<<(failures-1)) + + max := b.config.Max + if max <= 0 { + max = DefaultExponentialBackoffConfig.Max + } + + if backoff > max || backoff < 0 { + backoff = max + } + + // Choose a uniformly distributed value from [0, backoff). + return time.Duration(rand.Int63n(int64(backoff))) +} diff --git a/retry_test.go b/retry_test.go new file mode 100644 index 00000000..eba344e2 --- /dev/null +++ b/retry_test.go @@ -0,0 +1,96 @@ +package events + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestRetryingSinkBreaker(t *testing.T) { + testRetryingSink(t, NewBreaker(3, 10*time.Millisecond)) +} + +func TestRetryingSinkExponentialBackoff(t *testing.T) { + testRetryingSink(t, NewExponentialBackoff(ExponentialBackoffConfig{ + Base: time.Millisecond, + Factor: time.Millisecond, + Max: time.Millisecond * 5, + })) +} + +func testRetryingSink(t *testing.T, strategy RetryStrategy) { + const nevents = 100 + ts := newTestSink(t, nevents) + + // Make a sync that fails most of the time, ensuring that all the events + // make it through. + flaky := &flakySink{ + rate: 1.0, // start out always failing. + Sink: ts, + } + + s := NewRetryingSink(flaky, strategy) + + var wg sync.WaitGroup + for i := 1; i <= nevents; i++ { + event := "myevent-" + fmt.Sprint(i) + + // Above 50, set the failure rate lower + if i > 50 { + flaky.mu.Lock() + flaky.rate = 0.9 + flaky.mu.Unlock() + } + + wg.Add(1) + go func(event Event) { + defer wg.Done() + if err := s.Write(event); err != nil { + t.Fatalf("error writing event: %v", err) + } + }(event) + } + + wg.Wait() + checkClose(t, s) + + ts.mu.Lock() + defer ts.mu.Unlock() +} + +func TestExponentialBackoff(t *testing.T) { + strategy := NewExponentialBackoff(DefaultExponentialBackoffConfig) + backoff := strategy.Proceed(nil) + + if backoff != 0 { + t.Errorf("untouched backoff should be zero-wait: %v != 0", backoff) + } + + expected := strategy.config.Base + strategy.config.Factor + for i := 1; i <= 10; i++ { + if strategy.Failure(nil, nil) { + t.Errorf("no facilities for dropping events in ExponentialBackoff") + } + + for j := 0; j < 1000; j++ { + // sample this several thousand times. + backoff := strategy.Proceed(nil) + if backoff > expected { + t.Fatalf("expected must be bounded by %v after %v failures: %v", expected, i, backoff) + } + } + + expected = strategy.config.Base + strategy.config.Factor*time.Duration(1< strategy.config.Max { + expected = strategy.config.Max + } + } + + strategy.Success(nil) // recovery! + + backoff = strategy.Proceed(nil) + if backoff != 0 { + t.Errorf("should have recovered: %v != 0", backoff) + } +}