package cgi import ( "context" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "net/http/cgi" "os" "sync" "time" "git.capotej.com/capotej/communique/config" "github.com/dgraph-io/badger/v3" "github.com/go-fed/activity/streams" "github.com/go-fed/activity/streams/vocab" "go.uber.org/zap" ) type Servers struct { log *zap.SugaredLogger db *badger.DB } func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers { return &Servers{log: log, db: db} } // Start iterates over all Handlers and starts an internal CGI server for each one // along with ticker for the configured handler interval then blocks indefinitely func (s *Servers) Start(cfg config.Config) { var wg sync.WaitGroup for _, handler := range cfg.Handlers { wg.Add(2) go func(aHandler config.Handler) { defer wg.Done() startCGIServer(aHandler) }(handler) go func(aHandler config.Handler) { defer wg.Done() startTicker(aHandler, s.db, s.log) }(handler) } wg.Wait() } func startCGIServer(h config.Handler) { cgiHandler := cgi.Handler{Path: h.Exec} server := http.Server{ Handler: &cgiHandler, } sock := fmt.Sprintf("%s.sock", h.Name) os.Remove(sock) unixListener, err := net.Listen("unix", sock) if err != nil { panic(err) } server.Serve(unixListener) } func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { ticker := time.NewTicker(h.Interval) done := make(chan bool) func() { for { select { case <-done: return case _ = <-ticker.C: output := tick(h) err := processTick(h, output, db, log) if err != nil { log.Error(err) } } } }() } func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.SugaredLogger) error { var m map[string]interface{} err := json.Unmarshal(output, &m) if err != nil { return fmt.Errorf("could not unmarshal JSON: %w", err) } log.Debugf("processTick map is %+v", m) var coll vocab.ActivityStreamsOrderedCollection resolver, err := streams.NewJSONResolver(func(c context.Context, a vocab.ActivityStreamsOrderedCollection) error { // Example: store the article in the enclosing scope, for later. coll = a // We could pass an error back up, if desired. return nil }) ctx := context.Background() err = resolver.Resolve(ctx, m) if err != nil { return fmt.Errorf("could not resolve JSON: %w", err) } keyBase := fmt.Sprintf("outbox:%s", h.Name) seq, err := db.GetSequence([]byte(keyBase), 10) defer seq.Release() orderedProp := coll.GetActivityStreamsOrderedItems() for iter := orderedProp.Begin(); iter != orderedProp.End(); iter = iter.Next() { contentType := iter.GetType().GetTypeName() log.Debugf("found object type %s in activity stream", contentType) contentNote := iter.GetActivityStreamsNote() if contentNote.GetActivityStreamsContent() != nil { content := contentNote.GetActivityStreamsContent() var contentVal string for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() { contentVal = contentIter.GetXMLSchemaString() log.Debugf("contentVal is %+v", contentVal) autoinc, err := seq.Next() if err != nil { return fmt.Errorf("could not auto inc: %w", err) } keyName := fmt.Sprintf("%s:%d", keyBase, autoinc) err = db.Update(func(txn *badger.Txn) error { e := badger.NewEntry([]byte(keyName), []byte(contentVal)) log.With("name", "ticker").Infof("writing '%s' to %s", contentVal, keyName) return txn.SetEntry(e) }) if err != nil { return fmt.Errorf("could insert into outbox: %w", err) } } } } return nil } func tick(h config.Handler) []byte { sock := fmt.Sprintf("%s.sock", h.Name) httpc := http.Client{ Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("unix", sock) }, }, } var response *http.Response var err error response, err = httpc.Get("http://unix/" + sock) if err != nil { panic(err) } body, err := ioutil.ReadAll(response.Body) response.Body.Close() return body }