From 9280ecb7e0b0039d6c1f4800373eb76452145078 Mon Sep 17 00:00:00 2001 From: René 'Necoro' Neumann Date: Wed, 22 Apr 2020 23:40:53 +0200 Subject: Concurrent feed processing; central imap handler --- internal/feed/mail.go | 8 ++--- internal/imap/client.go | 29 +++++++++++++-- internal/imap/commando.go | 92 +++++++++++++++++++++++++++++++++++++++++++++++ internal/imap/imap.go | 4 ++- internal/yaml/yaml.go | 10 +++--- main.go | 49 +++++++++++++++---------- 6 files changed, 161 insertions(+), 31 deletions(-) create mode 100644 internal/imap/commando.go diff --git a/internal/feed/mail.go b/internal/feed/mail.go index 256c4fe..e8b6915 100644 --- a/internal/feed/mail.go +++ b/internal/feed/mail.go @@ -16,7 +16,7 @@ func address(name, address string) []*mail.Address { return []*mail.Address{{Name: name, Address: address}} } -func fromAdress(feed *Feed, item feeditem, cfg config.Config) []*mail.Address { +func fromAdress(feed *Feed, item feeditem, cfg *config.Config) []*mail.Address { switch { case item.Item.Author != nil && item.Item.Author.Email != "": return address(item.Item.Author.Name, item.Item.Author.Email) @@ -35,7 +35,7 @@ func writeHtml(writer io.Writer, item feeditem) error { return template.Feed.Execute(writer, item) } -func writeToBuffer(b *bytes.Buffer, feed *Feed, item feeditem, cfg config.Config) error { +func writeToBuffer(b *bytes.Buffer, feed *Feed, item feeditem, cfg *config.Config) error { var h mail.Header h.SetAddressList("From", fromAdress(feed, item, cfg)) h.SetAddressList("To", address(feed.Name, cfg.DefaultEmail)) @@ -98,7 +98,7 @@ func writeToBuffer(b *bytes.Buffer, feed *Feed, item feeditem, cfg config.Config return nil } -func asMail(feed *Feed, item feeditem, cfg config.Config) (string, error) { +func asMail(feed *Feed, item feeditem, cfg *config.Config) (string, error) { var b bytes.Buffer if err := writeToBuffer(&b, feed, item, cfg); err != nil { @@ -108,7 +108,7 @@ func asMail(feed *Feed, item feeditem, cfg config.Config) (string, error) { return b.String(), nil } -func (feed *Feed) ToMails(cfg config.Config) ([]string, error) { +func (feed *Feed) ToMails(cfg *config.Config) ([]string, error) { var ( err error mails = make([]string, len(feed.items)) diff --git a/internal/imap/client.go b/internal/imap/client.go index 3f004e2..da06f54 100644 --- a/internal/imap/client.go +++ b/internal/imap/client.go @@ -17,6 +17,7 @@ type Client struct { mailboxes mailboxes delimiter string toplevel Folder + commander *commander } type Folder struct { @@ -50,6 +51,8 @@ func (mbs mailboxes) add(elem *imap.MailboxInfo) { func (client *Client) Disconnect() { if client != nil { + client.stopCommander() + connected := (client.c.State() & imap.ConnectedState) != 0 _ = client.c.Logout() @@ -119,7 +122,7 @@ func (client *Client) fetchDelimiter() error { return nil } -func (client *Client) EnsureFolder(folder Folder) error { +func (client *Client) ensureFolder(folder Folder) error { if client.mailboxes.contains(folder) { return nil } @@ -146,6 +149,26 @@ func (client *Client) EnsureFolder(folder Folder) error { } } -func (client *Client) PutMessage(folder Folder, message string, date time.Time) error { - return client.c.Append(folder.String(), nil, date, strings.NewReader(message)) +func (client *Client) EnsureFolder(folder Folder, errorHandler ErrorHandler) { + client.commander.execute(ensureCommando{folder}, errorHandler) +} + +func (client *Client) putMessages(folder Folder, messages []string) error { + if len(messages) == 0 { + return nil + } + + now := time.Now() + for _, msg := range messages { + reader := strings.NewReader(msg) + if err := client.c.Append(folder.str, nil, now, reader); err != nil { + return fmt.Errorf("uploading message to %s: %w", folder, err) + } + } + + return nil +} + +func (client *Client) PutMessages(folder Folder, messages []string, errorHandler ErrorHandler) { + client.commander.execute(addCommando{folder, messages}, errorHandler) } diff --git a/internal/imap/commando.go b/internal/imap/commando.go new file mode 100644 index 0000000..fbfeaec --- /dev/null +++ b/internal/imap/commando.go @@ -0,0 +1,92 @@ +package imap + +import "github.com/Necoro/feed2imap-go/internal/log" + +const maxPipeDepth = 10 + +type commander struct { + client *Client + pipe chan<- execution + done chan<- struct{} +} + +type command interface { + execute(client *Client) error +} + +type ErrorHandler func(error) string + +type execution struct { + cmd command + done chan<- struct{} + errorHandler ErrorHandler +} + +type addCommando struct { + folder Folder + messages []string +} + +func (cmd addCommando) execute(client *Client) error { + return client.putMessages(cmd.folder, cmd.messages) +} + +type ensureCommando struct { + folder Folder +} + +func (cmd ensureCommando) execute(client *Client) error { + return client.ensureFolder(cmd.folder) +} + +func (commander *commander) execute(command command, handler ErrorHandler) { + done := make(chan struct{}) + commander.pipe <- execution{command, done, handler} + <-done +} + +func executioner(client *Client, pipe <-chan execution, done <-chan struct{}) { + for { + select { + case <-done: + return + case execution := <-pipe: + select { // break as soon as done is there + case <-done: + return + default: + } + if err := execution.cmd.execute(client); err != nil { + if execution.errorHandler == nil { + log.Error(err) + } else { + log.Error(execution.errorHandler(err)) + } + } + close(execution.done) + } + } +} + +func (client *Client) startCommander() { + if client.commander != nil { + return + } + + pipe := make(chan execution, maxPipeDepth) + done := make(chan struct{}) + + client.commander = &commander{client, pipe, done} + + go executioner(client, pipe, done) +} + +func (client *Client) stopCommander() { + if client.commander == nil { + return + } + + close(client.commander.done) + + client.commander = nil +} diff --git a/internal/imap/imap.go b/internal/imap/imap.go index f284f81..4965ca5 100644 --- a/internal/imap/imap.go +++ b/internal/imap/imap.go @@ -121,9 +121,11 @@ func Connect(url *url.URL) (*Client, error) { log.Printf("Determined '%s' as toplevel, with '%s' as delimiter", client.toplevel, client.delimiter) - if err = client.EnsureFolder(client.toplevel); err != nil { + if err = client.ensureFolder(client.toplevel); err != nil { return nil, err } + client.startCommander() + return &client, nil } diff --git a/internal/yaml/yaml.go b/internal/yaml/yaml.go index 207adab..9dc2615 100644 --- a/internal/yaml/yaml.go +++ b/internal/yaml/yaml.go @@ -111,24 +111,24 @@ func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error { return nil } -func Load(path string) (C.Config, F.Feeds, error) { +func Load(path string) (*C.Config, F.Feeds, error) { buf, err := ioutil.ReadFile(path) if err != nil { - return C.Config{}, nil, fmt.Errorf("while reading '%s': %w", path, err) + return nil, nil, fmt.Errorf("while reading '%s': %w", path, err) } var parsedCfg config if parsedCfg, err = parse(buf); err != nil { - return C.Config{}, nil, err + return nil, nil, err } feeds := F.Feeds{} if err := buildFeeds(parsedCfg.Feeds, []string{}, feeds); err != nil { - return C.Config{}, nil, fmt.Errorf("while parsing: %w", err) + return nil, nil, fmt.Errorf("while parsing: %w", err) } - return C.Config{ + return &C.Config{ GlobalOptions: parsedCfg.GlobalOptions, GlobalConfig: parsedCfg.GlobalConfig, }, feeds, nil diff --git a/main.go b/main.go index 8e4b64f..0fd5ba4 100644 --- a/main.go +++ b/main.go @@ -5,8 +5,9 @@ import ( "fmt" "net/url" "os" - "time" + "sync" + "github.com/Necoro/feed2imap-go/internal/config" "github.com/Necoro/feed2imap-go/internal/feed" "github.com/Necoro/feed2imap-go/internal/imap" "github.com/Necoro/feed2imap-go/internal/log" @@ -16,6 +17,31 @@ import ( var cfgFile = flag.String("f", "config.yml", "configuration file") var verbose = flag.Bool("v", false, "enable verbose output") +func processFeed(feed *feed.Feed, cfg *config.Config, client *imap.Client, wg *sync.WaitGroup) { + defer wg.Done() + + mails, err := feed.ToMails(cfg) + if err != nil { + log.Errorf("Processing items of feed %s: %s", feed.Name, err) + return + } + + if len(mails) == 0 { + return + } + + folder := client.NewFolder(feed.Target) + client.EnsureFolder(folder, func(err error) string { + return fmt.Sprintf("Creating folder of feed %s: %s", feed.Name, err) + }) + + client.PutMessages(folder, mails, func(err error) string { + return fmt.Sprintf("Uploading messages of feed %s: %s", feed.Name, err) + }) + + log.Printf("Uploaded %d messages to '%s' @ %s", len(mails), feed.Name, folder) +} + func run() error { flag.Parse() log.SetDebug(*verbose) @@ -46,25 +72,12 @@ func run() error { defer c.Disconnect() + var wg sync.WaitGroup + wg.Add(len(feeds)) for _, f := range feeds { - mails, err := f.ToMails(cfg) - if err != nil { - return err - } - if len(mails) == 0 { - continue - } - folder := c.NewFolder(f.Target) - if err = c.EnsureFolder(folder); err != nil { - return err - } - for _, mail := range mails { - if err = c.PutMessage(folder, mail, time.Now()); err != nil { - return err - } // TODO - } - log.Printf("Uploaded %d messages to '%s' @ %s", len(mails), f.Name, folder) + go processFeed(f, cfg, c, &wg) } + wg.Wait() return nil } -- cgit v1.2.3-54-g00ecf