--- /dev/null
+# Contributing to Docker open source projects
+
+Want to hack on go-events? Awesome! Here are instructions to get you started.
+
+go-events is part of the [Docker](https://www.docker.com) project, and
+follows the same rules and principles. If you're already familiar with the way
+Docker does things, you'll feel right at home.
+
+Otherwise, go read Docker's
+[contributions guidelines](https://github.com/docker/docker/blob/master/CONTRIBUTING.md),
+[issue triaging](https://github.com/docker/docker/blob/master/project/ISSUE-TRIAGE.md),
+[review process](https://github.com/docker/docker/blob/master/project/REVIEWING.md) and
+[branches and tags](https://github.com/docker/docker/blob/master/project/BRANCHES-AND-TAGS.md).
+
+For an in-depth description of our contribution process, visit the
+contributors guide: [Understand how to contribute](https://docs.docker.com/opensource/workflow/make-a-contribution/)
+
+### Sign your work
+
+The sign-off is a simple line at the end of the explanation for the patch. Your
+signature certifies that you wrote the patch or otherwise have the right to pass
+it on as an open-source patch. The rules are pretty simple: if you can certify
+the below (from [developercertificate.org](http://developercertificate.org/)):
+
+```
+Developer Certificate of Origin
+Version 1.1
+
+Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
+660 York Street, Suite 102,
+San Francisco, CA 94110 USA
+
+Everyone is permitted to copy and distribute verbatim copies of this
+license document, but changing it is not allowed.
+
+Developer's Certificate of Origin 1.1
+
+By making a contribution to this project, I certify that:
+
+(a) The contribution was created in whole or in part by me and I
+ have the right to submit it under the open source license
+ indicated in the file; or
+
+(b) The contribution is based upon previous work that, to the best
+ of my knowledge, is covered under an appropriate open source
+ license and I have the right under that license to submit that
+ work with modifications, whether created in whole or in part
+ by me, under the same open source license (unless I am
+ permitted to submit under a different license), as indicated
+ in the file; or
+
+(c) The contribution was provided directly to me by some other
+ person who certified (a), (b) or (c) and I have not modified
+ it.
+
+(d) I understand and agree that this project and the contribution
+ are public and that a record of the contribution (including all
+ personal information I submit with it, including my sign-off) is
+ maintained indefinitely and may be redistributed consistent with
+ this project or the open source license(s) involved.
+```
+
+Then you just add a line to every git commit message:
+
+ Signed-off-by: Joe Smith <joe.smith@email.com>
+
+Use your real name (sorry, no pseudonyms or anonymous contributions.)
+
+If you set your `user.name` and `user.email` git configs, you can sign your
+commit automatically with `git commit -s`.
--- /dev/null
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2016 Docker, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
--- /dev/null
+# go-events maintainers file
+#
+# This file describes who runs the docker/go-events project and how.
+# This is a living document - if you see something out of date or missing, speak up!
+#
+# It is structured to be consumable by both humans and programs.
+# To extract its contents programmatically, use any TOML-compliant parser.
+#
+# This file is compiled into the MAINTAINERS file in docker/opensource.
+#
+[Org]
+ [Org."Core maintainers"]
+ people = [
+ "aaronlehmann",
+ "aluzzardi",
+ "lk4d4",
+ "stevvooe",
+ ]
+
+[people]
+
+# A reference list of all people associated with the project.
+# All other sections should refer to people by their canonical key
+# in the people section.
+
+ # ADD YOURSELF HERE IN ALPHABETICAL ORDER
+
+ [people.aaronlehmann]
+ Name = "Aaron Lehmann"
+ Email = "aaron.lehmann@docker.com"
+ GitHub = "aaronlehmann"
+
+ [people.aluzzardi]
+ Name = "Andrea Luzzardi"
+ Email = "al@docker.com"
+ GitHub = "aluzzardi"
+
+ [people.lk4d4]
+ Name = "Alexander Morozov"
+ Email = "lk4d4@docker.com"
+ GitHub = "lk4d4"
+
+ [people.stevvooe]
+ Name = "Stephen Day"
+ Email = "stephen.day@docker.com"
+ GitHub = "stevvooe"
--- /dev/null
+# Docker Events Package
+
+[](https://godoc.org/github.com/docker/go-events)
+[](https://circleci.com/gh/docker/go-events)
+
+The Docker `events` package implements a composable event distribution package
+for Go.
+
+Originally created to implement the [notifications in Docker Registry
+2](https://github.com/docker/distribution/blob/master/docs/notifications.md),
+we've found the pattern to be useful in other applications. This package is
+most of the same code with slightly updated interfaces. Much of the internals
+have been made available.
+
+## Usage
+
+The `events` package centers around a `Sink` type. Events are written with
+calls to `Sink.Write(event Event)`. Sinks can be wired up in various
+configurations to achieve interesting behavior.
+
+The canonical example is that employed by the
+[docker/distribution/notifications](https://godoc.org/github.com/docker/distribution/notifications)
+package. Let's say we have a type `httpSink` where we'd like to queue
+notifications. As a rule, it should send a single http request and return an
+error if it fails:
+
+```go
+func (h *httpSink) Write(event Event) error {
+ p, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+ body := bytes.NewReader(p)
+ resp, err := h.client.Post(h.url, "application/json", body)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if resp.Status != 200 {
+ return errors.New("unexpected status")
+ }
+
+ return nil
+}
+
+// implement (*httpSink).Close()
+```
+
+With just that, we can start using components from this package. One can call
+`(*httpSink).Write` to send events as the body of a post request to a
+configured URL.
+
+### Retries
+
+HTTP can be unreliable. The first feature we'd like is to have some retry:
+
+```go
+hs := newHTTPSink(/*...*/)
+retry := NewRetryingSink(hs, NewBreaker(5, time.Second))
+```
+
+We now have a sink that will retry events against the `httpSink` until they
+succeed. The retry will backoff for one second after 5 consecutive failures
+using the breaker strategy.
+
+### Queues
+
+This isn't quite enough. We we want a sink that doesn't block while we are
+waiting for events to be sent. Let's add a `Queue`:
+
+```go
+queue := NewQueue(retry)
+```
+
+Now, we have an unbounded queue that will work through all events sent with
+`(*Queue).Write`. Events can be added asynchronously to the queue without
+blocking the current execution path. This is ideal for use in an http request.
+
+### Broadcast
+
+It usually turns out that you want to send to more than one listener. We can
+use `Broadcaster` to support this:
+
+```go
+var broadcast = NewBroadcaster() // make it available somewhere in your application.
+broadcast.Add(queue) // add your queue!
+broadcast.Add(queue2) // and another!
+```
+
+With the above, we can now call `broadcast.Write` in our http handlers and have
+all the events distributed to each queue. Because the events are queued, not
+listener blocks another.
+
+### Extending
+
+For the most part, the above is sufficient for a lot of applications. However,
+extending the above functionality can be done implementing your own `Sink`. The
+behavior and semantics of the sink can be completely dependent on the
+application requirements. The interface is provided below for reference:
+
+```go
+type Sink {
+ Write(Event) error
+ Close() error
+}
+```
+
+Application behavior can be controlled by how `Write` behaves. The examples
+above are designed to queue the message and return as quickly as possible.
+Other implementations may block until the event is committed to durable
+storage.
+
+## Copyright and license
+
+Copyright © 2016 Docker, Inc. go-events is licensed under the Apache License,
+Version 2.0. See [LICENSE](LICENSE) for the full license text.
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/sirupsen/logrus"
+)
+
+// Broadcaster sends events to multiple, reliable Sinks. The goal of this
+// component is to dispatch events to configured endpoints. Reliability can be
+// provided by wrapping incoming sinks.
+type Broadcaster struct {
+ sinks []Sink
+ events chan Event
+ adds chan configureRequest
+ removes chan configureRequest
+
+ shutdown chan struct{}
+ closed chan struct{}
+ once sync.Once
+}
+
+// NewBroadcaster appends one or more sinks to the list of sinks. The
+// broadcaster behavior will be affected by the properties of the sink.
+// Generally, the sink should accept all messages and deal with reliability on
+// its own. Use of EventQueue and RetryingSink should be used here.
+func NewBroadcaster(sinks ...Sink) *Broadcaster {
+ b := Broadcaster{
+ sinks: sinks,
+ events: make(chan Event),
+ adds: make(chan configureRequest),
+ removes: make(chan configureRequest),
+ shutdown: make(chan struct{}),
+ closed: make(chan struct{}),
+ }
+
+ // Start the broadcaster
+ go b.run()
+
+ return &b
+}
+
+// Write accepts an event to be dispatched to all sinks. This method will never
+// fail and should never block (hopefully!). The caller cedes the memory to the
+// broadcaster and should not modify it after calling write.
+func (b *Broadcaster) Write(event Event) error {
+ select {
+ case b.events <- event:
+ case <-b.closed:
+ return ErrSinkClosed
+ }
+ return nil
+}
+
+// Add the sink to the broadcaster.
+//
+// The provided sink must be comparable with equality. Typically, this just
+// works with a regular pointer type.
+func (b *Broadcaster) Add(sink Sink) error {
+ return b.configure(b.adds, sink)
+}
+
+// Remove the provided sink.
+func (b *Broadcaster) Remove(sink Sink) error {
+ return b.configure(b.removes, sink)
+}
+
+type configureRequest struct {
+ sink Sink
+ response chan error
+}
+
+func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
+ response := make(chan error, 1)
+
+ for {
+ select {
+ case ch <- configureRequest{
+ sink: sink,
+ response: response}:
+ ch = nil
+ case err := <-response:
+ return err
+ case <-b.closed:
+ return ErrSinkClosed
+ }
+ }
+}
+
+// Close the broadcaster, ensuring that all messages are flushed to the
+// underlying sink before returning.
+func (b *Broadcaster) Close() error {
+ b.once.Do(func() {
+ close(b.shutdown)
+ })
+
+ <-b.closed
+ return nil
+}
+
+// run is the main broadcast loop, started when the broadcaster is created.
+// Under normal conditions, it waits for events on the event channel. After
+// Close is called, this goroutine will exit.
+func (b *Broadcaster) run() {
+ defer close(b.closed)
+ remove := func(target Sink) {
+ for i, sink := range b.sinks {
+ if sink == target {
+ b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
+ break
+ }
+ }
+ }
+
+ for {
+ select {
+ case event := <-b.events:
+ for _, sink := range b.sinks {
+ if err := sink.Write(event); err != nil {
+ if err == ErrSinkClosed {
+ // remove closed sinks
+ remove(sink)
+ continue
+ }
+ logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
+ Errorf("broadcaster: dropping event")
+ }
+ }
+ case request := <-b.adds:
+ // while we have to iterate for add/remove, common iteration for
+ // send is faster against slice.
+
+ var found bool
+ for _, sink := range b.sinks {
+ if request.sink == sink {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ b.sinks = append(b.sinks, request.sink)
+ }
+ // b.sinks[request.sink] = struct{}{}
+ request.response <- nil
+ case request := <-b.removes:
+ remove(request.sink)
+ request.response <- nil
+ case <-b.shutdown:
+ // close all the underlying sinks
+ for _, sink := range b.sinks {
+ if err := sink.Close(); err != nil && err != ErrSinkClosed {
+ logrus.WithField("events.sink", sink).WithError(err).
+ Errorf("broadcaster: closing sink failed")
+ }
+ }
+ return
+ }
+ }
+}
+
+func (b *Broadcaster) String() string {
+ // Serialize copy of this broadcaster without the sync.Once, to avoid
+ // a data race.
+
+ b2 := map[string]interface{}{
+ "sinks": b.sinks,
+ "events": b.events,
+ "adds": b.adds,
+ "removes": b.removes,
+
+ "shutdown": b.shutdown,
+ "closed": b.closed,
+ }
+
+ return fmt.Sprint(b2)
+}
--- /dev/null
+package events
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestBroadcaster(t *testing.T) {
+ const nEvents = 1000
+ var sinks []Sink
+ b := NewBroadcaster()
+ for i := 0; i < 10; i++ {
+ sinks = append(sinks, newTestSink(t, nEvents))
+ b.Add(sinks[i])
+ b.Add(sinks[i]) // noop
+ }
+
+ var wg sync.WaitGroup
+ for i := 1; i <= nEvents; i++ {
+ wg.Add(1)
+ go func(event Event) {
+ if err := b.Write(event); err != nil {
+ t.Fatalf("error writing event %v: %v", event, err)
+ }
+ wg.Done()
+ }("event")
+ }
+
+ wg.Wait() // Wait until writes complete
+
+ for i := range sinks {
+ b.Remove(sinks[i])
+ }
+
+ // sending one more should trigger test failure if they weren't removed.
+ if err := b.Write("onemore"); err != nil {
+ t.Fatalf("unexpected error sending one more: %v", err)
+ }
+
+ // add them back to test closing.
+ for i := range sinks {
+ b.Add(sinks[i])
+ }
+
+ checkClose(t, b)
+
+ // Iterate through the sinks and check that they all have the expected length.
+ for _, sink := range sinks {
+ ts := sink.(*testSink)
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+
+ if len(ts.events) != nEvents {
+ t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
+ }
+
+ if !ts.closed {
+ t.Fatalf("sink should have been closed")
+ }
+ }
+}
+
+func BenchmarkBroadcast10(b *testing.B) {
+ benchmarkBroadcast(b, 10)
+}
+
+func BenchmarkBroadcast100(b *testing.B) {
+ benchmarkBroadcast(b, 100)
+}
+
+func BenchmarkBroadcast1000(b *testing.B) {
+ benchmarkBroadcast(b, 1000)
+}
+
+func BenchmarkBroadcast10000(b *testing.B) {
+ benchmarkBroadcast(b, 10000)
+}
+
+func benchmarkBroadcast(b *testing.B, nsinks int) {
+ // counter := metrics.NewCounter()
+ // metrics.DefaultRegistry.Register(fmt.Sprintf("nsinks: %v", nsinks), counter)
+ // go metrics.Log(metrics.DefaultRegistry, 500*time.Millisecond, log.New(os.Stderr, "metrics: ", log.LstdFlags))
+
+ b.StopTimer()
+ var sinks []Sink
+ for i := 0; i < nsinks; i++ {
+ // counter.Inc(1)
+ sinks = append(sinks, newTestSink(b, b.N))
+ // sinks = append(sinks, NewQueue(&testSink{t: b, expected: b.N}))
+ }
+ b.StartTimer()
+
+ // meter := metered{}
+ // NewQueue(meter.Egress(dst))
+
+ benchmarkSink(b, NewBroadcaster(sinks...))
+}
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "sync"
+)
+
+// Channel provides a sink that can be listened on. The writer and channel
+// listener must operate in separate goroutines.
+//
+// Consumers should listen on Channel.C until Closed is closed.
+type Channel struct {
+ C chan Event
+
+ closed chan struct{}
+ once sync.Once
+}
+
+// NewChannel returns a channel. If buffer is zero, the channel is
+// unbuffered.
+func NewChannel(buffer int) *Channel {
+ return &Channel{
+ C: make(chan Event, buffer),
+ closed: make(chan struct{}),
+ }
+}
+
+// Done returns a channel that will always proceed once the sink is closed.
+func (ch *Channel) Done() chan struct{} {
+ return ch.closed
+}
+
+// Write the event to the channel. Must be called in a separate goroutine from
+// the listener.
+func (ch *Channel) Write(event Event) error {
+ select {
+ case ch.C <- event:
+ return nil
+ case <-ch.closed:
+ return ErrSinkClosed
+ }
+}
+
+// Close the channel sink.
+func (ch *Channel) Close() error {
+ ch.once.Do(func() {
+ close(ch.closed)
+ })
+
+ return nil
+}
+
+func (ch *Channel) String() string {
+ // Serialize a copy of the Channel that doesn't contain the sync.Once,
+ // to avoid a data race.
+ ch2 := map[string]interface{}{
+ "C": ch.C,
+ "closed": ch.closed,
+ }
+ return fmt.Sprint(ch2)
+}
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+)
+
+func TestChannel(t *testing.T) {
+ const nevents = 100
+
+ sink := NewChannel(0)
+
+ go func() {
+ var wg sync.WaitGroup
+ for i := 1; i <= nevents; i++ {
+ event := "event-" + fmt.Sprint(i)
+ wg.Add(1)
+ go func(event Event) {
+ defer wg.Done()
+ if err := sink.Write(event); err != nil {
+ t.Fatalf("error writing event: %v", err)
+ }
+ }(event)
+ }
+ wg.Wait()
+ sink.Close()
+
+ // now send another bunch of events and ensure we stay closed
+ for i := 1; i <= nevents; i++ {
+ if err := sink.Write(i); err != ErrSinkClosed {
+ t.Fatalf("unexpected error: %v != %v", err, ErrSinkClosed)
+ }
+ }
+ }()
+
+ var received int
+loop:
+ for {
+ select {
+ case <-sink.C:
+ received++
+ case <-sink.Done():
+ break loop
+ }
+ }
+
+ sink.Close()
+ _, ok := <-sink.Done() // test will timeout if this hangs
+ if ok {
+ t.Fatalf("done should be a closed channel")
+ }
+
+ if received != nevents {
+ t.Fatalf("events did not make it through sink: %v != %v", received, nevents)
+ }
+}
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+)
+
+type tOrB interface {
+ Fatalf(format string, args ...interface{})
+ Logf(format string, args ...interface{})
+}
+
+type testSink struct {
+ t tOrB
+
+ events []Event
+ expected int
+ mu sync.Mutex
+ closed bool
+}
+
+func newTestSink(t tOrB, expected int) *testSink {
+ return &testSink{
+ t: t,
+ events: make([]Event, 0, expected), // pre-allocate so we aren't benching alloc
+ expected: expected,
+ }
+}
+
+func (ts *testSink) Write(event Event) error {
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+
+ if ts.closed {
+ return ErrSinkClosed
+ }
+
+ ts.events = append(ts.events, event)
+
+ if len(ts.events) > ts.expected {
+ ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
+ }
+
+ return nil
+}
+
+func (ts *testSink) Close() error {
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+ if ts.closed {
+ return ErrSinkClosed
+ }
+
+ ts.closed = true
+
+ if len(ts.events) != ts.expected {
+ ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
+ }
+
+ return nil
+}
+
+type delayedSink struct {
+ Sink
+ delay time.Duration
+}
+
+func (ds *delayedSink) Write(event Event) error {
+ time.Sleep(ds.delay)
+ return ds.Sink.Write(event)
+}
+
+type flakySink struct {
+ Sink
+ rate float64
+ mu sync.Mutex
+}
+
+func (fs *flakySink) Write(event Event) error {
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
+
+ if rand.Float64() < fs.rate {
+ return fmt.Errorf("error writing event: %v", event)
+ }
+
+ return fs.Sink.Write(event)
+}
+
+func checkClose(t *testing.T, sink Sink) {
+ if err := sink.Close(); err != nil {
+ t.Fatalf("unexpected error closing: %v", err)
+ }
+
+ // second close should not crash but should return an error.
+ if err := sink.Close(); err != nil {
+ t.Fatalf("unexpected error on double close: %v", err)
+ }
+
+ // Write after closed should be an error
+ if err := sink.Write("fail"); err == nil {
+ t.Fatalf("write after closed did not have an error")
+ } else if err != ErrSinkClosed {
+ t.Fatalf("error should be ErrSinkClosed")
+ }
+}
+
+func benchmarkSink(b *testing.B, sink Sink) {
+ defer sink.Close()
+ var event = "myevent"
+ for i := 0; i < b.N; i++ {
+ sink.Write(event)
+ }
+}
--- /dev/null
+package events
+
+import "fmt"
+
+var (
+ // ErrSinkClosed is returned if a write is issued to a sink that has been
+ // closed. If encountered, the error should be considered terminal and
+ // retries will not be successful.
+ ErrSinkClosed = fmt.Errorf("events: sink closed")
+)
--- /dev/null
+package events
+
+// Event marks items that can be sent as events.
+type Event interface{}
+
+// Sink accepts and sends events.
+type Sink interface {
+ // Write an event to the Sink. If no error is returned, the caller will
+ // assume that all events have been committed to the sink. If an error is
+ // received, the caller may retry sending the event.
+ Write(event Event) error
+
+ // Close the sink, possibly waiting for pending events to flush.
+ Close() error
+}
--- /dev/null
+package events
+
+// Matcher matches events.
+type Matcher interface {
+ Match(event Event) bool
+}
+
+// MatcherFunc implements matcher with just a function.
+type MatcherFunc func(event Event) bool
+
+// Match calls the wrapped function.
+func (fn MatcherFunc) Match(event Event) bool {
+ return fn(event)
+}
+
+// Filter provides an event sink that sends only events that are accepted by a
+// Matcher. No methods on filter are goroutine safe.
+type Filter struct {
+ dst Sink
+ matcher Matcher
+ closed bool
+}
+
+// NewFilter returns a new filter that will send to events to dst that return
+// true for Matcher.
+func NewFilter(dst Sink, matcher Matcher) Sink {
+ return &Filter{dst: dst, matcher: matcher}
+}
+
+// Write an event to the filter.
+func (f *Filter) Write(event Event) error {
+ if f.closed {
+ return ErrSinkClosed
+ }
+
+ if f.matcher.Match(event) {
+ return f.dst.Write(event)
+ }
+
+ return nil
+}
+
+// Close the filter and allow no more events to pass through.
+func (f *Filter) Close() error {
+ // TODO(stevvooe): Not all sinks should have Close.
+ if f.closed {
+ return nil
+ }
+
+ f.closed = true
+ return f.dst.Close()
+}
--- /dev/null
+package events
+
+import "testing"
+
+func TestFilter(t *testing.T) {
+ const nevents = 100
+ ts := newTestSink(t, nevents/2)
+ filter := NewFilter(ts, MatcherFunc(func(event Event) bool {
+ i, ok := event.(int)
+ return ok && i%2 == 0
+ }))
+
+ for i := 0; i < nevents; i++ {
+ if err := filter.Write(i); err != nil {
+ t.Fatalf("unexpected error writing event: %v", err)
+ }
+ }
+
+ checkClose(t, filter)
+
+}
--- /dev/null
+package events
+
+import (
+ "container/list"
+ "sync"
+
+ "github.com/sirupsen/logrus"
+)
+
+// Queue accepts all messages into a queue for asynchronous consumption
+// by a sink. It is unbounded and thread safe but the sink must be reliable or
+// events will be dropped.
+type Queue struct {
+ dst Sink
+ events *list.List
+ cond *sync.Cond
+ mu sync.Mutex
+ closed bool
+}
+
+// NewQueue returns a queue to the provided Sink dst.
+func NewQueue(dst Sink) *Queue {
+ eq := Queue{
+ dst: dst,
+ events: list.New(),
+ }
+
+ eq.cond = sync.NewCond(&eq.mu)
+ go eq.run()
+ return &eq
+}
+
+// Write accepts the events into the queue, only failing if the queue has
+// been closed.
+func (eq *Queue) Write(event Event) error {
+ eq.mu.Lock()
+ defer eq.mu.Unlock()
+
+ if eq.closed {
+ return ErrSinkClosed
+ }
+
+ eq.events.PushBack(event)
+ eq.cond.Signal() // signal waiters
+
+ return nil
+}
+
+// Close shutsdown the event queue, flushing
+func (eq *Queue) Close() error {
+ eq.mu.Lock()
+ defer eq.mu.Unlock()
+
+ if eq.closed {
+ return nil
+ }
+
+ // set closed flag
+ eq.closed = true
+ eq.cond.Signal() // signal flushes queue
+ eq.cond.Wait() // wait for signal from last flush
+ return eq.dst.Close()
+}
+
+// run is the main goroutine to flush events to the target sink.
+func (eq *Queue) run() {
+ for {
+ event := eq.next()
+
+ if event == nil {
+ return // nil block means event queue is closed.
+ }
+
+ if err := eq.dst.Write(event); err != nil {
+ // TODO(aaronl): Dropping events could be bad depending
+ // on the application. We should have a way of
+ // communicating this condition. However, logging
+ // at a log level above debug may not be appropriate.
+ // Eventually, go-events should not use logrus at all,
+ // and should bubble up conditions like this through
+ // error values.
+ logrus.WithFields(logrus.Fields{
+ "event": event,
+ "sink": eq.dst,
+ }).WithError(err).Debug("eventqueue: dropped event")
+ }
+ }
+}
+
+// next encompasses the critical section of the run loop. When the queue is
+// empty, it will block on the condition. If new data arrives, it will wake
+// and return a block. When closed, a nil slice will be returned.
+func (eq *Queue) next() Event {
+ eq.mu.Lock()
+ defer eq.mu.Unlock()
+
+ for eq.events.Len() < 1 {
+ if eq.closed {
+ eq.cond.Broadcast()
+ return nil
+ }
+
+ eq.cond.Wait()
+ }
+
+ front := eq.events.Front()
+ block := front.Value.(Event)
+ eq.events.Remove(front)
+
+ return block
+}
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestQueue(t *testing.T) {
+ const nevents = 1000
+
+ ts := newTestSink(t, nevents)
+ eq := NewQueue(
+ // delayed sync simulates destination slower than channel comms
+ &delayedSink{
+ Sink: ts,
+ delay: time.Millisecond * 1,
+ })
+ time.Sleep(10 * time.Millisecond) // let's queue settle to wait conidition.
+
+ var wg sync.WaitGroup
+ for i := 1; i <= nevents; i++ {
+ wg.Add(1)
+ go func(event Event) {
+ if err := eq.Write(event); err != nil {
+ t.Fatalf("error writing event: %v", err)
+ }
+ wg.Done()
+ }("event-" + fmt.Sprint(i))
+ }
+
+ wg.Wait()
+ checkClose(t, eq)
+
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+
+ if len(ts.events) != nevents {
+ t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
+ }
+
+ if !ts.closed {
+ t.Fatalf("sink should have been closed")
+ }
+}
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/sirupsen/logrus"
+)
+
+// RetryingSink retries the write until success or an ErrSinkClosed is
+// returned. Underlying sink must have p > 0 of succeeding or the sink will
+// block. Retry is configured with a RetryStrategy. Concurrent calls to a
+// retrying sink are serialized through the sink, meaning that if one is
+// in-flight, another will not proceed.
+type RetryingSink struct {
+ sink Sink
+ strategy RetryStrategy
+ closed chan struct{}
+ once sync.Once
+}
+
+// NewRetryingSink returns a sink that will retry writes to a sink, backing
+// off on failure. Parameters threshold and backoff adjust the behavior of the
+// circuit breaker.
+func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
+ rs := &RetryingSink{
+ sink: sink,
+ strategy: strategy,
+ closed: make(chan struct{}),
+ }
+
+ return rs
+}
+
+// Write attempts to flush the events to the downstream sink until it succeeds
+// or the sink is closed.
+func (rs *RetryingSink) Write(event Event) error {
+ logger := logrus.WithField("event", event)
+
+retry:
+ select {
+ case <-rs.closed:
+ return ErrSinkClosed
+ default:
+ }
+
+ if backoff := rs.strategy.Proceed(event); backoff > 0 {
+ select {
+ case <-time.After(backoff):
+ // TODO(stevvooe): This branch holds up the next try. Before, we
+ // would simply break to the "retry" label and then possibly wait
+ // again. However, this requires all retry strategies to have a
+ // large probability of probing the sync for success, rather than
+ // just backing off and sending the request.
+ case <-rs.closed:
+ return ErrSinkClosed
+ }
+ }
+
+ if err := rs.sink.Write(event); err != nil {
+ if err == ErrSinkClosed {
+ // terminal!
+ return err
+ }
+
+ logger := logger.WithError(err) // shadow!!
+
+ if rs.strategy.Failure(event, err) {
+ logger.Errorf("retryingsink: dropped event")
+ return nil
+ }
+
+ logger.Errorf("retryingsink: error writing event, retrying")
+ goto retry
+ }
+
+ rs.strategy.Success(event)
+ return nil
+}
+
+// Close closes the sink and the underlying sink.
+func (rs *RetryingSink) Close() error {
+ rs.once.Do(func() {
+ close(rs.closed)
+ })
+
+ return nil
+}
+
+func (rs *RetryingSink) String() string {
+ // Serialize a copy of the RetryingSink without the sync.Once, to avoid
+ // a data race.
+ rs2 := map[string]interface{}{
+ "sink": rs.sink,
+ "strategy": rs.strategy,
+ "closed": rs.closed,
+ }
+ return fmt.Sprint(rs2)
+}
+
+// RetryStrategy defines a strategy for retrying event sink writes.
+//
+// All methods should be goroutine safe.
+type RetryStrategy interface {
+ // Proceed is called before every event send. If proceed returns a
+ // positive, non-zero integer, the retryer will back off by the provided
+ // duration.
+ //
+ // An event is provided, by may be ignored.
+ Proceed(event Event) time.Duration
+
+ // Failure reports a failure to the strategy. If this method returns true,
+ // the event should be dropped.
+ Failure(event Event, err error) bool
+
+ // Success should be called when an event is sent successfully.
+ Success(event Event)
+}
+
+// Breaker implements a circuit breaker retry strategy.
+//
+// The current implementation never drops events.
+type Breaker struct {
+ threshold int
+ recent int
+ last time.Time
+ backoff time.Duration // time after which we retry after failure.
+ mu sync.Mutex
+}
+
+var _ RetryStrategy = &Breaker{}
+
+// NewBreaker returns a breaker that will backoff after the threshold has been
+// tripped. A Breaker is thread safe and may be shared by many goroutines.
+func NewBreaker(threshold int, backoff time.Duration) *Breaker {
+ return &Breaker{
+ threshold: threshold,
+ backoff: backoff,
+ }
+}
+
+// Proceed checks the failures against the threshold.
+func (b *Breaker) Proceed(event Event) time.Duration {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.recent < b.threshold {
+ return 0
+ }
+
+ return b.last.Add(b.backoff).Sub(time.Now())
+}
+
+// Success resets the breaker.
+func (b *Breaker) Success(event Event) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.recent = 0
+ b.last = time.Time{}
+}
+
+// Failure records the failure and latest failure time.
+func (b *Breaker) Failure(event Event, err error) bool {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.recent++
+ b.last = time.Now().UTC()
+ return false // never drop events.
+}
+
+var (
+ // DefaultExponentialBackoffConfig provides a default configuration for
+ // exponential backoff.
+ DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
+ Base: time.Second,
+ Factor: time.Second,
+ Max: 20 * time.Second,
+ }
+)
+
+// ExponentialBackoffConfig configures backoff parameters.
+//
+// Note that these parameters operate on the upper bound for choosing a random
+// value. For example, at Base=1s, a random value in [0,1s) will be chosen for
+// the backoff value.
+type ExponentialBackoffConfig struct {
+ // Base is the minimum bound for backing off after failure.
+ Base time.Duration
+
+ // Factor sets the amount of time by which the backoff grows with each
+ // failure.
+ Factor time.Duration
+
+ // Max is the absolute maxiumum bound for a single backoff.
+ Max time.Duration
+}
+
+// ExponentialBackoff implements random backoff with exponentially increasing
+// bounds as the number consecutive failures increase.
+type ExponentialBackoff struct {
+ config ExponentialBackoffConfig
+ failures uint64 // consecutive failure counter.
+}
+
+// NewExponentialBackoff returns an exponential backoff strategy with the
+// desired config. If config is nil, the default is returned.
+func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
+ return &ExponentialBackoff{
+ config: config,
+ }
+}
+
+// Proceed returns the next randomly bound exponential backoff time.
+func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
+ return b.backoff(atomic.LoadUint64(&b.failures))
+}
+
+// Success resets the failures counter.
+func (b *ExponentialBackoff) Success(event Event) {
+ atomic.StoreUint64(&b.failures, 0)
+}
+
+// Failure increments the failure counter.
+func (b *ExponentialBackoff) Failure(event Event, err error) bool {
+ atomic.AddUint64(&b.failures, 1)
+ return false
+}
+
+// backoff calculates the amount of time to wait based on the number of
+// consecutive failures.
+func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
+ if failures <= 0 {
+ // proceed normally when there are no failures.
+ return 0
+ }
+
+ factor := b.config.Factor
+ if factor <= 0 {
+ factor = DefaultExponentialBackoffConfig.Factor
+ }
+
+ backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
+
+ max := b.config.Max
+ if max <= 0 {
+ max = DefaultExponentialBackoffConfig.Max
+ }
+
+ if backoff > max || backoff < 0 {
+ backoff = max
+ }
+
+ // Choose a uniformly distributed value from [0, backoff).
+ return time.Duration(rand.Int63n(int64(backoff)))
+}
--- /dev/null
+package events
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestRetryingSinkBreaker(t *testing.T) {
+ testRetryingSink(t, NewBreaker(3, 10*time.Millisecond))
+}
+
+func TestRetryingSinkExponentialBackoff(t *testing.T) {
+ testRetryingSink(t, NewExponentialBackoff(ExponentialBackoffConfig{
+ Base: time.Millisecond,
+ Factor: time.Millisecond,
+ Max: time.Millisecond * 5,
+ }))
+}
+
+func testRetryingSink(t *testing.T, strategy RetryStrategy) {
+ const nevents = 100
+ ts := newTestSink(t, nevents)
+
+ // Make a sync that fails most of the time, ensuring that all the events
+ // make it through.
+ flaky := &flakySink{
+ rate: 1.0, // start out always failing.
+ Sink: ts,
+ }
+
+ s := NewRetryingSink(flaky, strategy)
+
+ var wg sync.WaitGroup
+ for i := 1; i <= nevents; i++ {
+ event := "myevent-" + fmt.Sprint(i)
+
+ // Above 50, set the failure rate lower
+ if i > 50 {
+ flaky.mu.Lock()
+ flaky.rate = 0.9
+ flaky.mu.Unlock()
+ }
+
+ wg.Add(1)
+ go func(event Event) {
+ defer wg.Done()
+ if err := s.Write(event); err != nil {
+ t.Fatalf("error writing event: %v", err)
+ }
+ }(event)
+ }
+
+ wg.Wait()
+ checkClose(t, s)
+
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+}
+
+func TestExponentialBackoff(t *testing.T) {
+ strategy := NewExponentialBackoff(DefaultExponentialBackoffConfig)
+ backoff := strategy.Proceed(nil)
+
+ if backoff != 0 {
+ t.Errorf("untouched backoff should be zero-wait: %v != 0", backoff)
+ }
+
+ expected := strategy.config.Base + strategy.config.Factor
+ for i := 1; i <= 10; i++ {
+ if strategy.Failure(nil, nil) {
+ t.Errorf("no facilities for dropping events in ExponentialBackoff")
+ }
+
+ for j := 0; j < 1000; j++ {
+ // sample this several thousand times.
+ backoff := strategy.Proceed(nil)
+ if backoff > expected {
+ t.Fatalf("expected must be bounded by %v after %v failures: %v", expected, i, backoff)
+ }
+ }
+
+ expected = strategy.config.Base + strategy.config.Factor*time.Duration(1<<uint64(i))
+ if expected > strategy.config.Max {
+ expected = strategy.config.Max
+ }
+ }
+
+ strategy.Success(nil) // recovery!
+
+ backoff = strategy.Proceed(nil)
+ if backoff != 0 {
+ t.Errorf("should have recovered: %v != 0", backoff)
+ }
+}