cookpad/streamy

Name: streamy

Owner: Cookpad Inc.

Description: Basic toolset for hooking into event stream

Created: 2017-04-13 05:26:26.0

Updated: 2018-05-24 13:24:00.0

Pushed: 2018-05-24 13:24:01.0

Homepage: null

Size: 134

Language: Ruby

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Streamy

Build Status

Installation

Add this line to your application's Gemfile:

'streamy'
Usage
Broadcasting events

Add this to config/initializer/streamy.rb

amy.message_bus = Streamy::MessageBuses::RabbitMessageBus.new(
i: "amqp://..."

Create an event:

le Events
ass ReceivedPayment < Streamy::Event
def topic
   "payments.transactions"
end

def body
  {
    amount: 200
  }
end

def event_time
  Time.now
end
d

Publish it:

ts::ReceivedPayment.publish
Consuming events

Configure the worker:

nfig/streamy.rb
amy.worker = Streamy::Workers::RabbitWorker.new(
i: "amqp://..."

For built-in deduplication of events:

nfig/streamy.rb
amy.cache = Rails.cache

Add consumer(s):

p/consumers/event_consumer.rb
s EventConsumer
clude Streamy::Consumer

Specify a topic to consume
nsume "payments.#"

Add event handler(s):

le EventHandlers
ass ReceivedPayment < Streamy::EventHandler
def process
  PaymentCounter.increment(body[:amount])
end
d

Start consuming:

rake streamy:worker:run
Consuming replayed events

Use replay instead of consume:

s EventConsumer
clude Streamy::Consumer

play "payments.#"

This will create two queues:

t_consumer # binds to `payments.#`, accumulates realtime events but doesn't process
t_consumer_replay # binds to `replay.event_consumer.payments.#`, processes replay events

Once caught up, switch back to consume and optionally specify a start_from timestamp to filter out any realtime events that may have already been replayed:

s EventConsumer
clude Streamy::Consumer

art_from 1525058571 # last event in replay queue
nsume "payments.#"

License

The gem is available as open source under the terms of the MIT License.


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.