aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/feed/mail.go8
-rw-r--r--internal/imap/client.go29
-rw-r--r--internal/imap/commando.go92
-rw-r--r--internal/imap/imap.go4
-rw-r--r--internal/yaml/yaml.go10
-rw-r--r--main.go49
6 files changed, 161 insertions, 31 deletions
diff --git a/internal/feed/mail.go b/internal/feed/mail.go
index 256c4fe..e8b6915 100644
--- a/internal/feed/mail.go
+++ b/internal/feed/mail.go
@@ -16,7 +16,7 @@ func address(name, address string) []*mail.Address {
return []*mail.Address{{Name: name, Address: address}}
}
-func fromAdress(feed *Feed, item feeditem, cfg config.Config) []*mail.Address {
+func fromAdress(feed *Feed, item feeditem, cfg *config.Config) []*mail.Address {
switch {
case item.Item.Author != nil && item.Item.Author.Email != "":
return address(item.Item.Author.Name, item.Item.Author.Email)
@@ -35,7 +35,7 @@ func writeHtml(writer io.Writer, item feeditem) error {
return template.Feed.Execute(writer, item)
}
-func writeToBuffer(b *bytes.Buffer, feed *Feed, item feeditem, cfg config.Config) error {
+func writeToBuffer(b *bytes.Buffer, feed *Feed, item feeditem, cfg *config.Config) error {
var h mail.Header
h.SetAddressList("From", fromAdress(feed, item, cfg))
h.SetAddressList("To", address(feed.Name, cfg.DefaultEmail))
@@ -98,7 +98,7 @@ func writeToBuffer(b *bytes.Buffer, feed *Feed, item feeditem, cfg config.Config
return nil
}
-func asMail(feed *Feed, item feeditem, cfg config.Config) (string, error) {
+func asMail(feed *Feed, item feeditem, cfg *config.Config) (string, error) {
var b bytes.Buffer
if err := writeToBuffer(&b, feed, item, cfg); err != nil {
@@ -108,7 +108,7 @@ func asMail(feed *Feed, item feeditem, cfg config.Config) (string, error) {
return b.String(), nil
}
-func (feed *Feed) ToMails(cfg config.Config) ([]string, error) {
+func (feed *Feed) ToMails(cfg *config.Config) ([]string, error) {
var (
err error
mails = make([]string, len(feed.items))
diff --git a/internal/imap/client.go b/internal/imap/client.go
index 3f004e2..da06f54 100644
--- a/internal/imap/client.go
+++ b/internal/imap/client.go
@@ -17,6 +17,7 @@ type Client struct {
mailboxes mailboxes
delimiter string
toplevel Folder
+ commander *commander
}
type Folder struct {
@@ -50,6 +51,8 @@ func (mbs mailboxes) add(elem *imap.MailboxInfo) {
func (client *Client) Disconnect() {
if client != nil {
+ client.stopCommander()
+
connected := (client.c.State() & imap.ConnectedState) != 0
_ = client.c.Logout()
@@ -119,7 +122,7 @@ func (client *Client) fetchDelimiter() error {
return nil
}
-func (client *Client) EnsureFolder(folder Folder) error {
+func (client *Client) ensureFolder(folder Folder) error {
if client.mailboxes.contains(folder) {
return nil
}
@@ -146,6 +149,26 @@ func (client *Client) EnsureFolder(folder Folder) error {
}
}
-func (client *Client) PutMessage(folder Folder, message string, date time.Time) error {
- return client.c.Append(folder.String(), nil, date, strings.NewReader(message))
+func (client *Client) EnsureFolder(folder Folder, errorHandler ErrorHandler) {
+ client.commander.execute(ensureCommando{folder}, errorHandler)
+}
+
+func (client *Client) putMessages(folder Folder, messages []string) error {
+ if len(messages) == 0 {
+ return nil
+ }
+
+ now := time.Now()
+ for _, msg := range messages {
+ reader := strings.NewReader(msg)
+ if err := client.c.Append(folder.str, nil, now, reader); err != nil {
+ return fmt.Errorf("uploading message to %s: %w", folder, err)
+ }
+ }
+
+ return nil
+}
+
+func (client *Client) PutMessages(folder Folder, messages []string, errorHandler ErrorHandler) {
+ client.commander.execute(addCommando{folder, messages}, errorHandler)
}
diff --git a/internal/imap/commando.go b/internal/imap/commando.go
new file mode 100644
index 0000000..fbfeaec
--- /dev/null
+++ b/internal/imap/commando.go
@@ -0,0 +1,92 @@
+package imap
+
+import "github.com/Necoro/feed2imap-go/internal/log"
+
+const maxPipeDepth = 10
+
+type commander struct {
+ client *Client
+ pipe chan<- execution
+ done chan<- struct{}
+}
+
+type command interface {
+ execute(client *Client) error
+}
+
+type ErrorHandler func(error) string
+
+type execution struct {
+ cmd command
+ done chan<- struct{}
+ errorHandler ErrorHandler
+}
+
+type addCommando struct {
+ folder Folder
+ messages []string
+}
+
+func (cmd addCommando) execute(client *Client) error {
+ return client.putMessages(cmd.folder, cmd.messages)
+}
+
+type ensureCommando struct {
+ folder Folder
+}
+
+func (cmd ensureCommando) execute(client *Client) error {
+ return client.ensureFolder(cmd.folder)
+}
+
+func (commander *commander) execute(command command, handler ErrorHandler) {
+ done := make(chan struct{})
+ commander.pipe <- execution{command, done, handler}
+ <-done
+}
+
+func executioner(client *Client, pipe <-chan execution, done <-chan struct{}) {
+ for {
+ select {
+ case <-done:
+ return
+ case execution := <-pipe:
+ select { // break as soon as done is there
+ case <-done:
+ return
+ default:
+ }
+ if err := execution.cmd.execute(client); err != nil {
+ if execution.errorHandler == nil {
+ log.Error(err)
+ } else {
+ log.Error(execution.errorHandler(err))
+ }
+ }
+ close(execution.done)
+ }
+ }
+}
+
+func (client *Client) startCommander() {
+ if client.commander != nil {
+ return
+ }
+
+ pipe := make(chan execution, maxPipeDepth)
+ done := make(chan struct{})
+
+ client.commander = &commander{client, pipe, done}
+
+ go executioner(client, pipe, done)
+}
+
+func (client *Client) stopCommander() {
+ if client.commander == nil {
+ return
+ }
+
+ close(client.commander.done)
+
+ client.commander = nil
+}
diff --git a/internal/imap/imap.go b/internal/imap/imap.go
index f284f81..4965ca5 100644
--- a/internal/imap/imap.go
+++ b/internal/imap/imap.go
@@ -121,9 +121,11 @@ func Connect(url *url.URL) (*Client, error) {
log.Printf("Determined '%s' as toplevel, with '%s' as delimiter", client.toplevel, client.delimiter)
- if err = client.EnsureFolder(client.toplevel); err != nil {
+ if err = client.ensureFolder(client.toplevel); err != nil {
return nil, err
}
+ client.startCommander()
+
return &client, nil
}
diff --git a/internal/yaml/yaml.go b/internal/yaml/yaml.go
index 207adab..9dc2615 100644
--- a/internal/yaml/yaml.go
+++ b/internal/yaml/yaml.go
@@ -111,24 +111,24 @@ func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error {
return nil
}
-func Load(path string) (C.Config, F.Feeds, error) {
+func Load(path string) (*C.Config, F.Feeds, error) {
buf, err := ioutil.ReadFile(path)
if err != nil {
- return C.Config{}, nil, fmt.Errorf("while reading '%s': %w", path, err)
+ return nil, nil, fmt.Errorf("while reading '%s': %w", path, err)
}
var parsedCfg config
if parsedCfg, err = parse(buf); err != nil {
- return C.Config{}, nil, err
+ return nil, nil, err
}
feeds := F.Feeds{}
if err := buildFeeds(parsedCfg.Feeds, []string{}, feeds); err != nil {
- return C.Config{}, nil, fmt.Errorf("while parsing: %w", err)
+ return nil, nil, fmt.Errorf("while parsing: %w", err)
}
- return C.Config{
+ return &C.Config{
GlobalOptions: parsedCfg.GlobalOptions,
GlobalConfig: parsedCfg.GlobalConfig,
}, feeds, nil
diff --git a/main.go b/main.go
index 8e4b64f..0fd5ba4 100644
--- a/main.go
+++ b/main.go
@@ -5,8 +5,9 @@ import (
"fmt"
"net/url"
"os"
- "time"
+ "sync"
+ "github.com/Necoro/feed2imap-go/internal/config"
"github.com/Necoro/feed2imap-go/internal/feed"
"github.com/Necoro/feed2imap-go/internal/imap"
"github.com/Necoro/feed2imap-go/internal/log"
@@ -16,6 +17,31 @@ import (
var cfgFile = flag.String("f", "config.yml", "configuration file")
var verbose = flag.Bool("v", false, "enable verbose output")
+func processFeed(feed *feed.Feed, cfg *config.Config, client *imap.Client, wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ mails, err := feed.ToMails(cfg)
+ if err != nil {
+ log.Errorf("Processing items of feed %s: %s", feed.Name, err)
+ return
+ }
+
+ if len(mails) == 0 {
+ return
+ }
+
+ folder := client.NewFolder(feed.Target)
+ client.EnsureFolder(folder, func(err error) string {
+ return fmt.Sprintf("Creating folder of feed %s: %s", feed.Name, err)
+ })
+
+ client.PutMessages(folder, mails, func(err error) string {
+ return fmt.Sprintf("Uploading messages of feed %s: %s", feed.Name, err)
+ })
+
+ log.Printf("Uploaded %d messages to '%s' @ %s", len(mails), feed.Name, folder)
+}
+
func run() error {
flag.Parse()
log.SetDebug(*verbose)
@@ -46,25 +72,12 @@ func run() error {
defer c.Disconnect()
+ var wg sync.WaitGroup
+ wg.Add(len(feeds))
for _, f := range feeds {
- mails, err := f.ToMails(cfg)
- if err != nil {
- return err
- }
- if len(mails) == 0 {
- continue
- }
- folder := c.NewFolder(f.Target)
- if err = c.EnsureFolder(folder); err != nil {
- return err
- }
- for _, mail := range mails {
- if err = c.PutMessage(folder, mail, time.Now()); err != nil {
- return err
- } // TODO
- }
- log.Printf("Uploaded %d messages to '%s' @ %s", len(mails), f.Name, folder)
+ go processFeed(f, cfg, c, &wg)
}
+ wg.Wait()
return nil
}