aboutsummaryrefslogtreecommitdiff
path: root/models/persister.go
blob: 60c25b5486bd3d1d0d21958d3c60205ae92285fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package models

import (
	"github.com/dgraph-io/badger/v3"
	"go.uber.org/zap"
)

type Persister struct {
	log *zap.SugaredLogger
	db  *badger.DB
}

type PersisterResult struct {
	Value []byte
	Key   []byte
}

func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister {
	logger := log.With("type", "db")
	aso := &Persister{log: logger, db: db}
	return aso
}

func (p *Persister) Store(model model) error {
	log := p.log.With("model", model.Name())
	log.Debug("store")
	err := p.db.View(func(txn *badger.Txn) error {
		var getErr error
		var updateErr error
		log.With("DedupKey", model.DedupKey()).Debug("checking")
		_, getErr = txn.Get([]byte(model.DedupKey()))
		if getErr == badger.ErrKeyNotFound {
			log.With("DedupKey", model.DedupKey()).Debug("not found")
			updateErr = p.db.Update(model.Save) // stores the outbox item
			if updateErr != nil {
				return updateErr
			}
			log.With("DedupKey", model.DedupKey()).Debug("saving")
			updateErr = p.db.Update(model.SaveDedup) // stores the sha256
		}
		return updateErr
	})
	return err
}

func (p *Persister) Count(model model) (int, error) {
	opts := badger.DefaultIteratorOptions
	opts.PrefetchValues = false
	var count int
	err := p.db.View(func(txn *badger.Txn) error {
		it := txn.NewIterator(opts)
		defer it.Close()
		prefix := []byte(model.Keybase())
		for it.Seek(prefix); it.ValidForPrefix([]byte(prefix)); it.Next() {
			count++
		}
		return nil
	})
	return count, err
}

func (p *Persister) Collect(model model) ([][]byte, error) {
	var result [][]byte
	err := p.db.View(func(txn *badger.Txn) error {
		opts := badger.DefaultIteratorOptions
		opts.PrefetchValues = true
		it := txn.NewIterator(opts)
		defer it.Close()
		prefix := []byte(model.Keybase())
		for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
			item := it.Item()
			item.Value(func(v []byte) error {
				result = append(result, v)
				return nil
			})
		}
		return nil
	})
	return result, err
}

func (p *Persister) Find(model model) ([]byte, error) {
	var result []byte
	var item *badger.Item
	err := p.db.View(func(txn *badger.Txn) error {
		var getErr error
		item, getErr = txn.Get([]byte(model.Key()))
		if getErr != nil {
			return getErr
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	err = item.Value(func(v []byte) error {
		result = v
		return nil
	})

	if err != nil {
		return nil, err
	}

	return result, nil
}