Import docker.io_19.03.4+dfsg2.orig-go-events.tar.xz
authorArnaud Rebillout <arnaud.rebillout@collabora.com>
Mon, 4 Nov 2019 09:06:27 +0000 (09:06 +0000)
committerArnaud Rebillout <arnaud.rebillout@collabora.com>
Mon, 4 Nov 2019 09:06:27 +0000 (09:06 +0000)
[dgit import orig docker.io_19.03.4+dfsg2.orig-go-events.tar.xz]

17 files changed:
CONTRIBUTING.md [new file with mode: 0644]
LICENSE [new file with mode: 0644]
MAINTAINERS [new file with mode: 0644]
README.md [new file with mode: 0644]
broadcast.go [new file with mode: 0644]
broadcast_test.go [new file with mode: 0644]
channel.go [new file with mode: 0644]
channel_test.go [new file with mode: 0644]
common_test.go [new file with mode: 0644]
errors.go [new file with mode: 0644]
event.go [new file with mode: 0644]
filter.go [new file with mode: 0644]
filter_test.go [new file with mode: 0644]
queue.go [new file with mode: 0644]
queue_test.go [new file with mode: 0644]
retry.go [new file with mode: 0644]
retry_test.go [new file with mode: 0644]

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644 (file)
index 0000000..d813af7
--- /dev/null
@@ -0,0 +1,70 @@
+# Contributing to Docker open source projects
+
+Want to hack on go-events? Awesome! Here are instructions to get you started.
+
+go-events is part of the [Docker](https://www.docker.com) project, and
+follows the same rules and principles. If you're already familiar with the way
+Docker does things, you'll feel right at home.
+
+Otherwise, go read Docker's
+[contributions guidelines](https://github.com/docker/docker/blob/master/CONTRIBUTING.md),
+[issue triaging](https://github.com/docker/docker/blob/master/project/ISSUE-TRIAGE.md),
+[review process](https://github.com/docker/docker/blob/master/project/REVIEWING.md) and
+[branches and tags](https://github.com/docker/docker/blob/master/project/BRANCHES-AND-TAGS.md).
+
+For an in-depth description of our contribution process, visit the
+contributors guide: [Understand how to contribute](https://docs.docker.com/opensource/workflow/make-a-contribution/)
+
+### Sign your work
+
+The sign-off is a simple line at the end of the explanation for the patch. Your
+signature certifies that you wrote the patch or otherwise have the right to pass
+it on as an open-source patch. The rules are pretty simple: if you can certify
+the below (from [developercertificate.org](http://developercertificate.org/)):
+
+```
+Developer Certificate of Origin
+Version 1.1
+
+Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
+660 York Street, Suite 102,
+San Francisco, CA 94110 USA
+
+Everyone is permitted to copy and distribute verbatim copies of this
+license document, but changing it is not allowed.
+
+Developer's Certificate of Origin 1.1
+
+By making a contribution to this project, I certify that:
+
+(a) The contribution was created in whole or in part by me and I
+    have the right to submit it under the open source license
+    indicated in the file; or
+
+(b) The contribution is based upon previous work that, to the best
+    of my knowledge, is covered under an appropriate open source
+    license and I have the right under that license to submit that
+    work with modifications, whether created in whole or in part
+    by me, under the same open source license (unless I am
+    permitted to submit under a different license), as indicated
+    in the file; or
+
+(c) The contribution was provided directly to me by some other
+    person who certified (a), (b) or (c) and I have not modified
+    it.
+
+(d) I understand and agree that this project and the contribution
+    are public and that a record of the contribution (including all
+    personal information I submit with it, including my sign-off) is
+    maintained indefinitely and may be redistributed consistent with
+    this project or the open source license(s) involved.
+```
+
+Then you just add a line to every git commit message:
+
+    Signed-off-by: Joe Smith <joe.smith@email.com>
+
+Use your real name (sorry, no pseudonyms or anonymous contributions.)
+
+If you set your `user.name` and `user.email` git configs, you can sign your
+commit automatically with `git commit -s`.
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..6d630cf
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright 2016 Docker, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/MAINTAINERS b/MAINTAINERS
new file mode 100644 (file)
index 0000000..e414d82
--- /dev/null
@@ -0,0 +1,46 @@
+# go-events maintainers file
+#
+# This file describes who runs the docker/go-events project and how.
+# This is a living document - if you see something out of date or missing, speak up!
+#
+# It is structured to be consumable by both humans and programs.
+# To extract its contents programmatically, use any TOML-compliant parser.
+#
+# This file is compiled into the MAINTAINERS file in docker/opensource.
+#
+[Org]
+       [Org."Core maintainers"]
+               people = [
+                       "aaronlehmann",
+                       "aluzzardi",
+                       "lk4d4",
+                       "stevvooe",
+               ]
+
+[people]
+
+# A reference list of all people associated with the project.
+# All other sections should refer to people by their canonical key
+# in the people section.
+
+       # ADD YOURSELF HERE IN ALPHABETICAL ORDER
+
+       [people.aaronlehmann]
+       Name = "Aaron Lehmann"
+       Email = "aaron.lehmann@docker.com"
+       GitHub = "aaronlehmann"
+
+       [people.aluzzardi]
+       Name = "Andrea Luzzardi"
+       Email = "al@docker.com"
+       GitHub = "aluzzardi"
+
+       [people.lk4d4]
+       Name = "Alexander Morozov"
+       Email = "lk4d4@docker.com"
+       GitHub = "lk4d4"
+
+       [people.stevvooe]
+       Name = "Stephen Day"
+       Email = "stephen.day@docker.com"
+       GitHub = "stevvooe"
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..0acafc2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,117 @@
+# Docker Events Package
+
+[![GoDoc](https://godoc.org/github.com/docker/go-events?status.svg)](https://godoc.org/github.com/docker/go-events)
+[![Circle CI](https://circleci.com/gh/docker/go-events.svg?style=shield)](https://circleci.com/gh/docker/go-events)
+
+The Docker `events` package implements a composable event distribution package
+for Go.
+
+Originally created to implement the [notifications in Docker Registry
+2](https://github.com/docker/distribution/blob/master/docs/notifications.md),
+we've found the pattern to be useful in other applications. This package is
+most of the same code with slightly updated interfaces. Much of the internals
+have been made available.
+
+## Usage
+
+The `events` package centers around a `Sink` type.  Events are written with
+calls to `Sink.Write(event Event)`. Sinks can be wired up in various
+configurations to achieve interesting behavior.
+
+The canonical example is that employed by the
+[docker/distribution/notifications](https://godoc.org/github.com/docker/distribution/notifications)
+package. Let's say we have a type `httpSink` where we'd like to queue
+notifications. As a rule, it should send a single http request and return an
+error if it fails:
+
+```go
+func (h *httpSink) Write(event Event) error {
+       p, err := json.Marshal(event)
+       if err != nil {
+               return err
+       }
+       body := bytes.NewReader(p)
+       resp, err := h.client.Post(h.url, "application/json", body)
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       
+       if resp.Status != 200 {
+               return errors.New("unexpected status")
+       }
+
+       return nil
+}
+
+// implement (*httpSink).Close()
+```
+
+With just that, we can start using components from this package. One can call
+`(*httpSink).Write` to send events as the body of a post request to a
+configured URL.
+
+### Retries
+
+HTTP can be unreliable. The first feature we'd like is to have some retry:
+
+```go
+hs := newHTTPSink(/*...*/)
+retry := NewRetryingSink(hs, NewBreaker(5, time.Second))
+```
+
+We now have a sink that will retry events against the `httpSink` until they
+succeed. The retry will backoff for one second after 5 consecutive failures
+using the breaker strategy.
+
+### Queues
+
+This isn't quite enough. We we want a sink that doesn't block while we are
+waiting for events to be sent. Let's add a `Queue`:
+
+```go
+queue := NewQueue(retry)
+```
+
+Now, we have an unbounded queue that will work through all events sent with
+`(*Queue).Write`. Events can be added asynchronously to the queue without
+blocking the current execution path. This is ideal for use in an http request.
+
+### Broadcast
+
+It usually turns out that you want to send to more than one listener. We can
+use `Broadcaster` to support this:
+
+```go
+var broadcast = NewBroadcaster() // make it available somewhere in your application.
+broadcast.Add(queue) // add your queue!
+broadcast.Add(queue2) // and another!
+```
+
+With the above, we can now call `broadcast.Write` in our http handlers and have
+all the events distributed to each queue. Because the events are queued, not
+listener blocks another.
+
+### Extending
+
+For the most part, the above is sufficient for a lot of applications. However,
+extending the above functionality can be done implementing your own `Sink`. The
+behavior and semantics of the sink can be completely dependent on the
+application requirements. The interface is provided below for reference:
+
+```go
+type Sink {
+       Write(Event) error
+       Close() error
+}
+```
+
+Application behavior can be controlled by how `Write` behaves. The examples
+above are designed to queue the message and return as quickly as possible.
+Other implementations may block until the event is committed to durable
+storage.
+
+## Copyright and license
+
+Copyright Â© 2016 Docker, Inc. go-events is licensed under the Apache License,
+Version 2.0. See [LICENSE](LICENSE) for the full license text.
diff --git a/broadcast.go b/broadcast.go
new file mode 100644 (file)
index 0000000..5120078
--- /dev/null
@@ -0,0 +1,178 @@
+package events
+
+import (
+       "fmt"
+       "sync"
+
+       "github.com/sirupsen/logrus"
+)
+
+// Broadcaster sends events to multiple, reliable Sinks. The goal of this
+// component is to dispatch events to configured endpoints. Reliability can be
+// provided by wrapping incoming sinks.
+type Broadcaster struct {
+       sinks   []Sink
+       events  chan Event
+       adds    chan configureRequest
+       removes chan configureRequest
+
+       shutdown chan struct{}
+       closed   chan struct{}
+       once     sync.Once
+}
+
+// NewBroadcaster appends one or more sinks to the list of sinks. The
+// broadcaster behavior will be affected by the properties of the sink.
+// Generally, the sink should accept all messages and deal with reliability on
+// its own. Use of EventQueue and RetryingSink should be used here.
+func NewBroadcaster(sinks ...Sink) *Broadcaster {
+       b := Broadcaster{
+               sinks:    sinks,
+               events:   make(chan Event),
+               adds:     make(chan configureRequest),
+               removes:  make(chan configureRequest),
+               shutdown: make(chan struct{}),
+               closed:   make(chan struct{}),
+       }
+
+       // Start the broadcaster
+       go b.run()
+
+       return &b
+}
+
+// Write accepts an event to be dispatched to all sinks. This method will never
+// fail and should never block (hopefully!). The caller cedes the memory to the
+// broadcaster and should not modify it after calling write.
+func (b *Broadcaster) Write(event Event) error {
+       select {
+       case b.events <- event:
+       case <-b.closed:
+               return ErrSinkClosed
+       }
+       return nil
+}
+
+// Add the sink to the broadcaster.
+//
+// The provided sink must be comparable with equality. Typically, this just
+// works with a regular pointer type.
+func (b *Broadcaster) Add(sink Sink) error {
+       return b.configure(b.adds, sink)
+}
+
+// Remove the provided sink.
+func (b *Broadcaster) Remove(sink Sink) error {
+       return b.configure(b.removes, sink)
+}
+
+type configureRequest struct {
+       sink     Sink
+       response chan error
+}
+
+func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
+       response := make(chan error, 1)
+
+       for {
+               select {
+               case ch <- configureRequest{
+                       sink:     sink,
+                       response: response}:
+                       ch = nil
+               case err := <-response:
+                       return err
+               case <-b.closed:
+                       return ErrSinkClosed
+               }
+       }
+}
+
+// Close the broadcaster, ensuring that all messages are flushed to the
+// underlying sink before returning.
+func (b *Broadcaster) Close() error {
+       b.once.Do(func() {
+               close(b.shutdown)
+       })
+
+       <-b.closed
+       return nil
+}
+
+// run is the main broadcast loop, started when the broadcaster is created.
+// Under normal conditions, it waits for events on the event channel. After
+// Close is called, this goroutine will exit.
+func (b *Broadcaster) run() {
+       defer close(b.closed)
+       remove := func(target Sink) {
+               for i, sink := range b.sinks {
+                       if sink == target {
+                               b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
+                               break
+                       }
+               }
+       }
+
+       for {
+               select {
+               case event := <-b.events:
+                       for _, sink := range b.sinks {
+                               if err := sink.Write(event); err != nil {
+                                       if err == ErrSinkClosed {
+                                               // remove closed sinks
+                                               remove(sink)
+                                               continue
+                                       }
+                                       logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
+                                               Errorf("broadcaster: dropping event")
+                               }
+                       }
+               case request := <-b.adds:
+                       // while we have to iterate for add/remove, common iteration for
+                       // send is faster against slice.
+
+                       var found bool
+                       for _, sink := range b.sinks {
+                               if request.sink == sink {
+                                       found = true
+                                       break
+                               }
+                       }
+
+                       if !found {
+                               b.sinks = append(b.sinks, request.sink)
+                       }
+                       // b.sinks[request.sink] = struct{}{}
+                       request.response <- nil
+               case request := <-b.removes:
+                       remove(request.sink)
+                       request.response <- nil
+               case <-b.shutdown:
+                       // close all the underlying sinks
+                       for _, sink := range b.sinks {
+                               if err := sink.Close(); err != nil && err != ErrSinkClosed {
+                                       logrus.WithField("events.sink", sink).WithError(err).
+                                               Errorf("broadcaster: closing sink failed")
+                               }
+                       }
+                       return
+               }
+       }
+}
+
+func (b *Broadcaster) String() string {
+       // Serialize copy of this broadcaster without the sync.Once, to avoid
+       // a data race.
+
+       b2 := map[string]interface{}{
+               "sinks":   b.sinks,
+               "events":  b.events,
+               "adds":    b.adds,
+               "removes": b.removes,
+
+               "shutdown": b.shutdown,
+               "closed":   b.closed,
+       }
+
+       return fmt.Sprint(b2)
+}
diff --git a/broadcast_test.go b/broadcast_test.go
new file mode 100644 (file)
index 0000000..aeac9e0
--- /dev/null
@@ -0,0 +1,97 @@
+package events
+
+import (
+       "sync"
+       "testing"
+)
+
+func TestBroadcaster(t *testing.T) {
+       const nEvents = 1000
+       var sinks []Sink
+       b := NewBroadcaster()
+       for i := 0; i < 10; i++ {
+               sinks = append(sinks, newTestSink(t, nEvents))
+               b.Add(sinks[i])
+               b.Add(sinks[i]) // noop
+       }
+
+       var wg sync.WaitGroup
+       for i := 1; i <= nEvents; i++ {
+               wg.Add(1)
+               go func(event Event) {
+                       if err := b.Write(event); err != nil {
+                               t.Fatalf("error writing event %v: %v", event, err)
+                       }
+                       wg.Done()
+               }("event")
+       }
+
+       wg.Wait() // Wait until writes complete
+
+       for i := range sinks {
+               b.Remove(sinks[i])
+       }
+
+       // sending one more should trigger test failure if they weren't removed.
+       if err := b.Write("onemore"); err != nil {
+               t.Fatalf("unexpected error sending one more: %v", err)
+       }
+
+       // add them back to test closing.
+       for i := range sinks {
+               b.Add(sinks[i])
+       }
+
+       checkClose(t, b)
+
+       // Iterate through the sinks and check that they all have the expected length.
+       for _, sink := range sinks {
+               ts := sink.(*testSink)
+               ts.mu.Lock()
+               defer ts.mu.Unlock()
+
+               if len(ts.events) != nEvents {
+                       t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
+               }
+
+               if !ts.closed {
+                       t.Fatalf("sink should have been closed")
+               }
+       }
+}
+
+func BenchmarkBroadcast10(b *testing.B) {
+       benchmarkBroadcast(b, 10)
+}
+
+func BenchmarkBroadcast100(b *testing.B) {
+       benchmarkBroadcast(b, 100)
+}
+
+func BenchmarkBroadcast1000(b *testing.B) {
+       benchmarkBroadcast(b, 1000)
+}
+
+func BenchmarkBroadcast10000(b *testing.B) {
+       benchmarkBroadcast(b, 10000)
+}
+
+func benchmarkBroadcast(b *testing.B, nsinks int) {
+       // counter := metrics.NewCounter()
+       // metrics.DefaultRegistry.Register(fmt.Sprintf("nsinks: %v", nsinks), counter)
+       // go metrics.Log(metrics.DefaultRegistry, 500*time.Millisecond, log.New(os.Stderr, "metrics: ", log.LstdFlags))
+
+       b.StopTimer()
+       var sinks []Sink
+       for i := 0; i < nsinks; i++ {
+               // counter.Inc(1)
+               sinks = append(sinks, newTestSink(b, b.N))
+               // sinks = append(sinks, NewQueue(&testSink{t: b, expected: b.N}))
+       }
+       b.StartTimer()
+
+       // meter := metered{}
+       // NewQueue(meter.Egress(dst))
+
+       benchmarkSink(b, NewBroadcaster(sinks...))
+}
diff --git a/channel.go b/channel.go
new file mode 100644 (file)
index 0000000..802cf51
--- /dev/null
@@ -0,0 +1,61 @@
+package events
+
+import (
+       "fmt"
+       "sync"
+)
+
+// Channel provides a sink that can be listened on. The writer and channel
+// listener must operate in separate goroutines.
+//
+// Consumers should listen on Channel.C until Closed is closed.
+type Channel struct {
+       C chan Event
+
+       closed chan struct{}
+       once   sync.Once
+}
+
+// NewChannel returns a channel. If buffer is zero, the channel is
+// unbuffered.
+func NewChannel(buffer int) *Channel {
+       return &Channel{
+               C:      make(chan Event, buffer),
+               closed: make(chan struct{}),
+       }
+}
+
+// Done returns a channel that will always proceed once the sink is closed.
+func (ch *Channel) Done() chan struct{} {
+       return ch.closed
+}
+
+// Write the event to the channel. Must be called in a separate goroutine from
+// the listener.
+func (ch *Channel) Write(event Event) error {
+       select {
+       case ch.C <- event:
+               return nil
+       case <-ch.closed:
+               return ErrSinkClosed
+       }
+}
+
+// Close the channel sink.
+func (ch *Channel) Close() error {
+       ch.once.Do(func() {
+               close(ch.closed)
+       })
+
+       return nil
+}
+
+func (ch *Channel) String() string {
+       // Serialize a copy of the Channel that doesn't contain the sync.Once,
+       // to avoid a data race.
+       ch2 := map[string]interface{}{
+               "C":      ch.C,
+               "closed": ch.closed,
+       }
+       return fmt.Sprint(ch2)
+}
diff --git a/channel_test.go b/channel_test.go
new file mode 100644 (file)
index 0000000..63cc76d
--- /dev/null
@@ -0,0 +1,57 @@
+package events
+
+import (
+       "fmt"
+       "sync"
+       "testing"
+)
+
+func TestChannel(t *testing.T) {
+       const nevents = 100
+
+       sink := NewChannel(0)
+
+       go func() {
+               var wg sync.WaitGroup
+               for i := 1; i <= nevents; i++ {
+                       event := "event-" + fmt.Sprint(i)
+                       wg.Add(1)
+                       go func(event Event) {
+                               defer wg.Done()
+                               if err := sink.Write(event); err != nil {
+                                       t.Fatalf("error writing event: %v", err)
+                               }
+                       }(event)
+               }
+               wg.Wait()
+               sink.Close()
+
+               // now send another bunch of events and ensure we stay closed
+               for i := 1; i <= nevents; i++ {
+                       if err := sink.Write(i); err != ErrSinkClosed {
+                               t.Fatalf("unexpected error: %v != %v", err, ErrSinkClosed)
+                       }
+               }
+       }()
+
+       var received int
+loop:
+       for {
+               select {
+               case <-sink.C:
+                       received++
+               case <-sink.Done():
+                       break loop
+               }
+       }
+
+       sink.Close()
+       _, ok := <-sink.Done() // test will timeout if this hangs
+       if ok {
+               t.Fatalf("done should be a closed channel")
+       }
+
+       if received != nevents {
+               t.Fatalf("events did not make it through sink: %v != %v", received, nevents)
+       }
+}
diff --git a/common_test.go b/common_test.go
new file mode 100644 (file)
index 0000000..14f940b
--- /dev/null
@@ -0,0 +1,117 @@
+package events
+
+import (
+       "fmt"
+       "math/rand"
+       "sync"
+       "testing"
+       "time"
+)
+
+type tOrB interface {
+       Fatalf(format string, args ...interface{})
+       Logf(format string, args ...interface{})
+}
+
+type testSink struct {
+       t tOrB
+
+       events   []Event
+       expected int
+       mu       sync.Mutex
+       closed   bool
+}
+
+func newTestSink(t tOrB, expected int) *testSink {
+       return &testSink{
+               t:        t,
+               events:   make([]Event, 0, expected), // pre-allocate so we aren't benching alloc
+               expected: expected,
+       }
+}
+
+func (ts *testSink) Write(event Event) error {
+       ts.mu.Lock()
+       defer ts.mu.Unlock()
+
+       if ts.closed {
+               return ErrSinkClosed
+       }
+
+       ts.events = append(ts.events, event)
+
+       if len(ts.events) > ts.expected {
+               ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
+       }
+
+       return nil
+}
+
+func (ts *testSink) Close() error {
+       ts.mu.Lock()
+       defer ts.mu.Unlock()
+       if ts.closed {
+               return ErrSinkClosed
+       }
+
+       ts.closed = true
+
+       if len(ts.events) != ts.expected {
+               ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
+       }
+
+       return nil
+}
+
+type delayedSink struct {
+       Sink
+       delay time.Duration
+}
+
+func (ds *delayedSink) Write(event Event) error {
+       time.Sleep(ds.delay)
+       return ds.Sink.Write(event)
+}
+
+type flakySink struct {
+       Sink
+       rate float64
+       mu   sync.Mutex
+}
+
+func (fs *flakySink) Write(event Event) error {
+       fs.mu.Lock()
+       defer fs.mu.Unlock()
+
+       if rand.Float64() < fs.rate {
+               return fmt.Errorf("error writing event: %v", event)
+       }
+
+       return fs.Sink.Write(event)
+}
+
+func checkClose(t *testing.T, sink Sink) {
+       if err := sink.Close(); err != nil {
+               t.Fatalf("unexpected error closing: %v", err)
+       }
+
+       // second close should not crash but should return an error.
+       if err := sink.Close(); err != nil {
+               t.Fatalf("unexpected error on double close: %v", err)
+       }
+
+       // Write after closed should be an error
+       if err := sink.Write("fail"); err == nil {
+               t.Fatalf("write after closed did not have an error")
+       } else if err != ErrSinkClosed {
+               t.Fatalf("error should be ErrSinkClosed")
+       }
+}
+
+func benchmarkSink(b *testing.B, sink Sink) {
+       defer sink.Close()
+       var event = "myevent"
+       for i := 0; i < b.N; i++ {
+               sink.Write(event)
+       }
+}
diff --git a/errors.go b/errors.go
new file mode 100644 (file)
index 0000000..56db7c2
--- /dev/null
+++ b/errors.go
@@ -0,0 +1,10 @@
+package events
+
+import "fmt"
+
+var (
+       // ErrSinkClosed is returned if a write is issued to a sink that has been
+       // closed. If encountered, the error should be considered terminal and
+       // retries will not be successful.
+       ErrSinkClosed = fmt.Errorf("events: sink closed")
+)
diff --git a/event.go b/event.go
new file mode 100644 (file)
index 0000000..f0f1d9e
--- /dev/null
+++ b/event.go
@@ -0,0 +1,15 @@
+package events
+
+// Event marks items that can be sent as events.
+type Event interface{}
+
+// Sink accepts and sends events.
+type Sink interface {
+       // Write an event to the Sink. If no error is returned, the caller will
+       // assume that all events have been committed to the sink. If an error is
+       // received, the caller may retry sending the event.
+       Write(event Event) error
+
+       // Close the sink, possibly waiting for pending events to flush.
+       Close() error
+}
diff --git a/filter.go b/filter.go
new file mode 100644 (file)
index 0000000..e6c0eb6
--- /dev/null
+++ b/filter.go
@@ -0,0 +1,52 @@
+package events
+
+// Matcher matches events.
+type Matcher interface {
+       Match(event Event) bool
+}
+
+// MatcherFunc implements matcher with just a function.
+type MatcherFunc func(event Event) bool
+
+// Match calls the wrapped function.
+func (fn MatcherFunc) Match(event Event) bool {
+       return fn(event)
+}
+
+// Filter provides an event sink that sends only events that are accepted by a
+// Matcher. No methods on filter are goroutine safe.
+type Filter struct {
+       dst     Sink
+       matcher Matcher
+       closed  bool
+}
+
+// NewFilter returns a new filter that will send to events to dst that return
+// true for Matcher.
+func NewFilter(dst Sink, matcher Matcher) Sink {
+       return &Filter{dst: dst, matcher: matcher}
+}
+
+// Write an event to the filter.
+func (f *Filter) Write(event Event) error {
+       if f.closed {
+               return ErrSinkClosed
+       }
+
+       if f.matcher.Match(event) {
+               return f.dst.Write(event)
+       }
+
+       return nil
+}
+
+// Close the filter and allow no more events to pass through.
+func (f *Filter) Close() error {
+       // TODO(stevvooe): Not all sinks should have Close.
+       if f.closed {
+               return nil
+       }
+
+       f.closed = true
+       return f.dst.Close()
+}
diff --git a/filter_test.go b/filter_test.go
new file mode 100644 (file)
index 0000000..9f51f2d
--- /dev/null
@@ -0,0 +1,21 @@
+package events
+
+import "testing"
+
+func TestFilter(t *testing.T) {
+       const nevents = 100
+       ts := newTestSink(t, nevents/2)
+       filter := NewFilter(ts, MatcherFunc(func(event Event) bool {
+               i, ok := event.(int)
+               return ok && i%2 == 0
+       }))
+
+       for i := 0; i < nevents; i++ {
+               if err := filter.Write(i); err != nil {
+                       t.Fatalf("unexpected error writing event: %v", err)
+               }
+       }
+
+       checkClose(t, filter)
+
+}
diff --git a/queue.go b/queue.go
new file mode 100644 (file)
index 0000000..4bb770a
--- /dev/null
+++ b/queue.go
@@ -0,0 +1,111 @@
+package events
+
+import (
+       "container/list"
+       "sync"
+
+       "github.com/sirupsen/logrus"
+)
+
+// Queue accepts all messages into a queue for asynchronous consumption
+// by a sink. It is unbounded and thread safe but the sink must be reliable or
+// events will be dropped.
+type Queue struct {
+       dst    Sink
+       events *list.List
+       cond   *sync.Cond
+       mu     sync.Mutex
+       closed bool
+}
+
+// NewQueue returns a queue to the provided Sink dst.
+func NewQueue(dst Sink) *Queue {
+       eq := Queue{
+               dst:    dst,
+               events: list.New(),
+       }
+
+       eq.cond = sync.NewCond(&eq.mu)
+       go eq.run()
+       return &eq
+}
+
+// Write accepts the events into the queue, only failing if the queue has
+// been closed.
+func (eq *Queue) Write(event Event) error {
+       eq.mu.Lock()
+       defer eq.mu.Unlock()
+
+       if eq.closed {
+               return ErrSinkClosed
+       }
+
+       eq.events.PushBack(event)
+       eq.cond.Signal() // signal waiters
+
+       return nil
+}
+
+// Close shutsdown the event queue, flushing
+func (eq *Queue) Close() error {
+       eq.mu.Lock()
+       defer eq.mu.Unlock()
+
+       if eq.closed {
+               return nil
+       }
+
+       // set closed flag
+       eq.closed = true
+       eq.cond.Signal() // signal flushes queue
+       eq.cond.Wait()   // wait for signal from last flush
+       return eq.dst.Close()
+}
+
+// run is the main goroutine to flush events to the target sink.
+func (eq *Queue) run() {
+       for {
+               event := eq.next()
+
+               if event == nil {
+                       return // nil block means event queue is closed.
+               }
+
+               if err := eq.dst.Write(event); err != nil {
+                       // TODO(aaronl): Dropping events could be bad depending
+                       // on the application. We should have a way of
+                       // communicating this condition. However, logging
+                       // at a log level above debug may not be appropriate.
+                       // Eventually, go-events should not use logrus at all,
+                       // and should bubble up conditions like this through
+                       // error values.
+                       logrus.WithFields(logrus.Fields{
+                               "event": event,
+                               "sink":  eq.dst,
+                       }).WithError(err).Debug("eventqueue: dropped event")
+               }
+       }
+}
+
+// next encompasses the critical section of the run loop. When the queue is
+// empty, it will block on the condition. If new data arrives, it will wake
+// and return a block. When closed, a nil slice will be returned.
+func (eq *Queue) next() Event {
+       eq.mu.Lock()
+       defer eq.mu.Unlock()
+
+       for eq.events.Len() < 1 {
+               if eq.closed {
+                       eq.cond.Broadcast()
+                       return nil
+               }
+
+               eq.cond.Wait()
+       }
+
+       front := eq.events.Front()
+       block := front.Value.(Event)
+       eq.events.Remove(front)
+
+       return block
+}
diff --git a/queue_test.go b/queue_test.go
new file mode 100644 (file)
index 0000000..7bfe0e3
--- /dev/null
@@ -0,0 +1,46 @@
+package events
+
+import (
+       "fmt"
+       "sync"
+       "testing"
+       "time"
+)
+
+func TestQueue(t *testing.T) {
+       const nevents = 1000
+
+       ts := newTestSink(t, nevents)
+       eq := NewQueue(
+               // delayed sync simulates destination slower than channel comms
+               &delayedSink{
+                       Sink:  ts,
+                       delay: time.Millisecond * 1,
+               })
+       time.Sleep(10 * time.Millisecond) // let's queue settle to wait conidition.
+
+       var wg sync.WaitGroup
+       for i := 1; i <= nevents; i++ {
+               wg.Add(1)
+               go func(event Event) {
+                       if err := eq.Write(event); err != nil {
+                               t.Fatalf("error writing event: %v", err)
+                       }
+                       wg.Done()
+               }("event-" + fmt.Sprint(i))
+       }
+
+       wg.Wait()
+       checkClose(t, eq)
+
+       ts.mu.Lock()
+       defer ts.mu.Unlock()
+
+       if len(ts.events) != nevents {
+               t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
+       }
+
+       if !ts.closed {
+               t.Fatalf("sink should have been closed")
+       }
+}
diff --git a/retry.go b/retry.go
new file mode 100644 (file)
index 0000000..2df55d2
--- /dev/null
+++ b/retry.go
@@ -0,0 +1,260 @@
+package events
+
+import (
+       "fmt"
+       "math/rand"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/sirupsen/logrus"
+)
+
+// RetryingSink retries the write until success or an ErrSinkClosed is
+// returned. Underlying sink must have p > 0 of succeeding or the sink will
+// block. Retry is configured with a RetryStrategy.  Concurrent calls to a
+// retrying sink are serialized through the sink, meaning that if one is
+// in-flight, another will not proceed.
+type RetryingSink struct {
+       sink     Sink
+       strategy RetryStrategy
+       closed   chan struct{}
+       once     sync.Once
+}
+
+// NewRetryingSink returns a sink that will retry writes to a sink, backing
+// off on failure. Parameters threshold and backoff adjust the behavior of the
+// circuit breaker.
+func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
+       rs := &RetryingSink{
+               sink:     sink,
+               strategy: strategy,
+               closed:   make(chan struct{}),
+       }
+
+       return rs
+}
+
+// Write attempts to flush the events to the downstream sink until it succeeds
+// or the sink is closed.
+func (rs *RetryingSink) Write(event Event) error {
+       logger := logrus.WithField("event", event)
+
+retry:
+       select {
+       case <-rs.closed:
+               return ErrSinkClosed
+       default:
+       }
+
+       if backoff := rs.strategy.Proceed(event); backoff > 0 {
+               select {
+               case <-time.After(backoff):
+                       // TODO(stevvooe): This branch holds up the next try. Before, we
+                       // would simply break to the "retry" label and then possibly wait
+                       // again. However, this requires all retry strategies to have a
+                       // large probability of probing the sync for success, rather than
+                       // just backing off and sending the request.
+               case <-rs.closed:
+                       return ErrSinkClosed
+               }
+       }
+
+       if err := rs.sink.Write(event); err != nil {
+               if err == ErrSinkClosed {
+                       // terminal!
+                       return err
+               }
+
+               logger := logger.WithError(err) // shadow!!
+
+               if rs.strategy.Failure(event, err) {
+                       logger.Errorf("retryingsink: dropped event")
+                       return nil
+               }
+
+               logger.Errorf("retryingsink: error writing event, retrying")
+               goto retry
+       }
+
+       rs.strategy.Success(event)
+       return nil
+}
+
+// Close closes the sink and the underlying sink.
+func (rs *RetryingSink) Close() error {
+       rs.once.Do(func() {
+               close(rs.closed)
+       })
+
+       return nil
+}
+
+func (rs *RetryingSink) String() string {
+       // Serialize a copy of the RetryingSink without the sync.Once, to avoid
+       // a data race.
+       rs2 := map[string]interface{}{
+               "sink":     rs.sink,
+               "strategy": rs.strategy,
+               "closed":   rs.closed,
+       }
+       return fmt.Sprint(rs2)
+}
+
+// RetryStrategy defines a strategy for retrying event sink writes.
+//
+// All methods should be goroutine safe.
+type RetryStrategy interface {
+       // Proceed is called before every event send. If proceed returns a
+       // positive, non-zero integer, the retryer will back off by the provided
+       // duration.
+       //
+       // An event is provided, by may be ignored.
+       Proceed(event Event) time.Duration
+
+       // Failure reports a failure to the strategy. If this method returns true,
+       // the event should be dropped.
+       Failure(event Event, err error) bool
+
+       // Success should be called when an event is sent successfully.
+       Success(event Event)
+}
+
+// Breaker implements a circuit breaker retry strategy.
+//
+// The current implementation never drops events.
+type Breaker struct {
+       threshold int
+       recent    int
+       last      time.Time
+       backoff   time.Duration // time after which we retry after failure.
+       mu        sync.Mutex
+}
+
+var _ RetryStrategy = &Breaker{}
+
+// NewBreaker returns a breaker that will backoff after the threshold has been
+// tripped. A Breaker is thread safe and may be shared by many goroutines.
+func NewBreaker(threshold int, backoff time.Duration) *Breaker {
+       return &Breaker{
+               threshold: threshold,
+               backoff:   backoff,
+       }
+}
+
+// Proceed checks the failures against the threshold.
+func (b *Breaker) Proceed(event Event) time.Duration {
+       b.mu.Lock()
+       defer b.mu.Unlock()
+
+       if b.recent < b.threshold {
+               return 0
+       }
+
+       return b.last.Add(b.backoff).Sub(time.Now())
+}
+
+// Success resets the breaker.
+func (b *Breaker) Success(event Event) {
+       b.mu.Lock()
+       defer b.mu.Unlock()
+
+       b.recent = 0
+       b.last = time.Time{}
+}
+
+// Failure records the failure and latest failure time.
+func (b *Breaker) Failure(event Event, err error) bool {
+       b.mu.Lock()
+       defer b.mu.Unlock()
+
+       b.recent++
+       b.last = time.Now().UTC()
+       return false // never drop events.
+}
+
+var (
+       // DefaultExponentialBackoffConfig provides a default configuration for
+       // exponential backoff.
+       DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
+               Base:   time.Second,
+               Factor: time.Second,
+               Max:    20 * time.Second,
+       }
+)
+
+// ExponentialBackoffConfig configures backoff parameters.
+//
+// Note that these parameters operate on the upper bound for choosing a random
+// value. For example, at Base=1s, a random value in [0,1s) will be chosen for
+// the backoff value.
+type ExponentialBackoffConfig struct {
+       // Base is the minimum bound for backing off after failure.
+       Base time.Duration
+
+       // Factor sets the amount of time by which the backoff grows with each
+       // failure.
+       Factor time.Duration
+
+       // Max is the absolute maxiumum bound for a single backoff.
+       Max time.Duration
+}
+
+// ExponentialBackoff implements random backoff with exponentially increasing
+// bounds as the number consecutive failures increase.
+type ExponentialBackoff struct {
+       config   ExponentialBackoffConfig
+       failures uint64 // consecutive failure counter.
+}
+
+// NewExponentialBackoff returns an exponential backoff strategy with the
+// desired config. If config is nil, the default is returned.
+func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
+       return &ExponentialBackoff{
+               config: config,
+       }
+}
+
+// Proceed returns the next randomly bound exponential backoff time.
+func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
+       return b.backoff(atomic.LoadUint64(&b.failures))
+}
+
+// Success resets the failures counter.
+func (b *ExponentialBackoff) Success(event Event) {
+       atomic.StoreUint64(&b.failures, 0)
+}
+
+// Failure increments the failure counter.
+func (b *ExponentialBackoff) Failure(event Event, err error) bool {
+       atomic.AddUint64(&b.failures, 1)
+       return false
+}
+
+// backoff calculates the amount of time to wait based on the number of
+// consecutive failures.
+func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
+       if failures <= 0 {
+               // proceed normally when there are no failures.
+               return 0
+       }
+
+       factor := b.config.Factor
+       if factor <= 0 {
+               factor = DefaultExponentialBackoffConfig.Factor
+       }
+
+       backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
+
+       max := b.config.Max
+       if max <= 0 {
+               max = DefaultExponentialBackoffConfig.Max
+       }
+
+       if backoff > max || backoff < 0 {
+               backoff = max
+       }
+
+       // Choose a uniformly distributed value from [0, backoff).
+       return time.Duration(rand.Int63n(int64(backoff)))
+}
diff --git a/retry_test.go b/retry_test.go
new file mode 100644 (file)
index 0000000..eba344e
--- /dev/null
@@ -0,0 +1,96 @@
+package events
+
+import (
+       "fmt"
+       "sync"
+       "testing"
+       "time"
+)
+
+func TestRetryingSinkBreaker(t *testing.T) {
+       testRetryingSink(t, NewBreaker(3, 10*time.Millisecond))
+}
+
+func TestRetryingSinkExponentialBackoff(t *testing.T) {
+       testRetryingSink(t, NewExponentialBackoff(ExponentialBackoffConfig{
+               Base:   time.Millisecond,
+               Factor: time.Millisecond,
+               Max:    time.Millisecond * 5,
+       }))
+}
+
+func testRetryingSink(t *testing.T, strategy RetryStrategy) {
+       const nevents = 100
+       ts := newTestSink(t, nevents)
+
+       // Make a sync that fails most of the time, ensuring that all the events
+       // make it through.
+       flaky := &flakySink{
+               rate: 1.0, // start out always failing.
+               Sink: ts,
+       }
+
+       s := NewRetryingSink(flaky, strategy)
+
+       var wg sync.WaitGroup
+       for i := 1; i <= nevents; i++ {
+               event := "myevent-" + fmt.Sprint(i)
+
+               // Above 50, set the failure rate lower
+               if i > 50 {
+                       flaky.mu.Lock()
+                       flaky.rate = 0.9
+                       flaky.mu.Unlock()
+               }
+
+               wg.Add(1)
+               go func(event Event) {
+                       defer wg.Done()
+                       if err := s.Write(event); err != nil {
+                               t.Fatalf("error writing event: %v", err)
+                       }
+               }(event)
+       }
+
+       wg.Wait()
+       checkClose(t, s)
+
+       ts.mu.Lock()
+       defer ts.mu.Unlock()
+}
+
+func TestExponentialBackoff(t *testing.T) {
+       strategy := NewExponentialBackoff(DefaultExponentialBackoffConfig)
+       backoff := strategy.Proceed(nil)
+
+       if backoff != 0 {
+               t.Errorf("untouched backoff should be zero-wait: %v != 0", backoff)
+       }
+
+       expected := strategy.config.Base + strategy.config.Factor
+       for i := 1; i <= 10; i++ {
+               if strategy.Failure(nil, nil) {
+                       t.Errorf("no facilities for dropping events in ExponentialBackoff")
+               }
+
+               for j := 0; j < 1000; j++ {
+                       // sample this several thousand times.
+                       backoff := strategy.Proceed(nil)
+                       if backoff > expected {
+                               t.Fatalf("expected must be bounded by %v after %v failures: %v", expected, i, backoff)
+                       }
+               }
+
+               expected = strategy.config.Base + strategy.config.Factor*time.Duration(1<<uint64(i))
+               if expected > strategy.config.Max {
+                       expected = strategy.config.Max
+               }
+       }
+
+       strategy.Success(nil) // recovery!
+
+       backoff = strategy.Proceed(nil)
+       if backoff != 0 {
+               t.Errorf("should have recovered: %v != 0", backoff)
+       }
+}