aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cgi/servers.go2
-rw-r--r--config/config.go9
-rw-r--r--main.go8
-rw-r--r--models/dedup_item.go14
-rw-r--r--models/model.go3
-rw-r--r--models/outbox_item.go29
-rw-r--r--models/persister.go22
-rw-r--r--sample-config.toml3
8 files changed, 68 insertions, 22 deletions
diff --git a/cgi/servers.go b/cgi/servers.go
index c751135..58d72c1 100644
--- a/cgi/servers.go
+++ b/cgi/servers.go
@@ -62,7 +62,7 @@ func startCGIServer(h config.Handler) {
}
func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger) {
- ticker := time.NewTicker(h.Interval)
+ ticker := time.NewTicker(h.Interval) // TODO add some random jitter here so handlers dont run at the same exact intervals
done := make(chan bool)
func() {
for {
diff --git a/config/config.go b/config/config.go
index e8e5387..3c6cdde 100644
--- a/config/config.go
+++ b/config/config.go
@@ -9,8 +9,9 @@ type Config struct {
}
type Handler struct {
- Name string
- Exec string
- Rpc string
- Interval time.Duration
+ Name string
+ Exec string
+ Rpc string
+ DedupWindow time.Duration
+ Interval time.Duration
}
diff --git a/main.go b/main.go
index 35b5b80..e65a600 100644
--- a/main.go
+++ b/main.go
@@ -39,6 +39,14 @@ func main() {
}
defer db.Close()
+ log.With("type", "db").Debug("Running GC")
+ err = db.RunValueLogGC(0.5)
+ if err == badger.ErrNoRewrite {
+ log.With("type", "db").Debug("Nothing to GC")
+ } else {
+ log.Fatal(err)
+ }
+
// Persister
persister := models.NewPersister(log, db)
diff --git a/models/dedup_item.go b/models/dedup_item.go
deleted file mode 100644
index b079181..0000000
--- a/models/dedup_item.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package models
-
-// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject {
-// aso := &ActivityStreamsObject{wrappedObject: obj}
-// return aso
-// }
-
-// func (a *ActivityStreamsObject) Id() []byte {
-
-// }
-
-// func (a *ActivityStreamsObject) Value() []byte {
-
-// }
diff --git a/models/model.go b/models/model.go
index 354b3bd..8832ce1 100644
--- a/models/model.go
+++ b/models/model.go
@@ -3,7 +3,10 @@ package models
import "github.com/dgraph-io/badger/v3"
type model interface {
+ Name() string
Save(txn *badger.Txn) error
+ SaveDedup(txn *badger.Txn) error
Keybase() string
Key() string
+ DedupKey() string
}
diff --git a/models/outbox_item.go b/models/outbox_item.go
index 9d9de97..34457d6 100644
--- a/models/outbox_item.go
+++ b/models/outbox_item.go
@@ -2,6 +2,7 @@ package models
import (
"bytes"
+ "crypto/sha256"
"encoding/gob"
"fmt"
"time"
@@ -15,6 +16,7 @@ type OutboxItem struct {
Handler config.Handler
Content []byte
Id []byte
+ Sha256 string
CreatedAt time.Time
}
@@ -26,25 +28,48 @@ func NewOutboxItem(h config.Handler) *OutboxItem {
func CreateOutboxItem(h config.Handler, content []byte) *OutboxItem {
t := time.Now()
+ hash := sha256.New()
+ hash.Write(content)
+ binHash := hash.Sum(nil)
k, _ := ksuid.NewRandomWithTime(t)
aso := &OutboxItem{
Handler: h,
CreatedAt: t,
Content: content,
+ Sha256: fmt.Sprintf("%x", binHash),
Id: []byte(k.String()), // NOTE: we want the bytes of the string representation of a hash, NOT a binary hash
}
return aso
}
+func (a *OutboxItem) Name() string {
+ return "OutboxItem"
+}
+
func (a *OutboxItem) Key() string {
return fmt.Sprintf("%s:%s", a.Keybase(), a.Id)
}
+func (a *OutboxItem) DedupKey() string {
+ return fmt.Sprintf("dd:%s:sha256:%s", a.Keybase(), a.Sha256)
+}
+
func (a *OutboxItem) Keybase() string {
keyBase := fmt.Sprintf("outboxes:%s", a.Handler.Name)
return keyBase
}
+func (a *OutboxItem) SaveDedup(txn *badger.Txn) error {
+ if len(a.Content) == 0 {
+ return fmt.Errorf("content not set")
+ }
+ if len(a.Sha256) == 0 {
+ return fmt.Errorf("sha256 not set")
+ }
+ e := badger.NewEntry([]byte(a.DedupKey()), nil).WithTTL(a.Handler.DedupWindow)
+ return txn.SetEntry(e)
+}
+
func (a *OutboxItem) Save(txn *badger.Txn) error {
if len(a.Content) == 0 {
return fmt.Errorf("content not set")
@@ -52,6 +77,10 @@ func (a *OutboxItem) Save(txn *badger.Txn) error {
if len(a.Id) == 0 {
return fmt.Errorf("id not set")
}
+ if len(a.Sha256) == 0 {
+ return fmt.Errorf("sha256 not set")
+ }
+
var network bytes.Buffer
enc := gob.NewEncoder(&network)
err := enc.Encode(a)
diff --git a/models/persister.go b/models/persister.go
index 69e79f4..60c25b5 100644
--- a/models/persister.go
+++ b/models/persister.go
@@ -16,12 +16,30 @@ type PersisterResult struct {
}
func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister {
- aso := &Persister{log: log, db: db}
+ logger := log.With("type", "db")
+ aso := &Persister{log: logger, db: db}
return aso
}
func (p *Persister) Store(model model) error {
- err := p.db.Update(model.Save)
+ log := p.log.With("model", model.Name())
+ log.Debug("store")
+ err := p.db.View(func(txn *badger.Txn) error {
+ var getErr error
+ var updateErr error
+ log.With("DedupKey", model.DedupKey()).Debug("checking")
+ _, getErr = txn.Get([]byte(model.DedupKey()))
+ if getErr == badger.ErrKeyNotFound {
+ log.With("DedupKey", model.DedupKey()).Debug("not found")
+ updateErr = p.db.Update(model.Save) // stores the outbox item
+ if updateErr != nil {
+ return updateErr
+ }
+ log.With("DedupKey", model.DedupKey()).Debug("saving")
+ updateErr = p.db.Update(model.SaveDedup) // stores the sha256
+ }
+ return updateErr
+ })
return err
}
diff --git a/sample-config.toml b/sample-config.toml
index 218660a..a77678c 100644
--- a/sample-config.toml
+++ b/sample-config.toml
@@ -6,7 +6,8 @@ name = "sample"
rpc = "cgi" # rename to protocol?
# add response type? like Note
exec = "sample-cgi-handler.sh"
-interval = "55s"
+interval = "5s"
+dedupWindow = "1m"
# [[handlers]]
# name = "another"