aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRené 'Necoro' Neumann <necoro@necoro.eu>2020-04-23 20:10:05 +0200
committerRené 'Necoro' Neumann <necoro@necoro.eu>2020-04-23 20:10:05 +0200
commit6bd87a567ef481b922f6baec2b475ec376c45443 (patch)
treed13c1995234acd3a07d16dafb009aa53ba05a7cb
parent7bfe97f3e72279a17c16803d64a16beb55944332 (diff)
downloadfeed2imap-go-6bd87a567ef481b922f6baec2b475ec376c45443.tar.gz
feed2imap-go-6bd87a567ef481b922f6baec2b475ec376c45443.tar.bz2
feed2imap-go-6bd87a567ef481b922f6baec2b475ec376c45443.zip
Concurrent imap
-rw-r--r--internal/imap/client.go141
-rw-r--r--internal/imap/commando.go43
-rw-r--r--internal/imap/imap.go82
-rw-r--r--main.go14
4 files changed, 184 insertions, 96 deletions
diff --git a/internal/imap/client.go b/internal/imap/client.go
index da06f54..5e2546b 100644
--- a/internal/imap/client.go
+++ b/internal/imap/client.go
@@ -3,6 +3,7 @@ package imap
import (
"fmt"
"strings"
+ "sync"
"time"
"github.com/emersion/go-imap"
@@ -11,20 +12,37 @@ import (
"github.com/Necoro/feed2imap-go/internal/log"
)
-type Client struct {
- c *imapClient.Client
+const numberConns = 5
+
+type connConf struct {
host string
- mailboxes mailboxes
delimiter string
toplevel Folder
+}
+
+type connection struct {
+ *connConf
+ mailboxes *mailboxes
+ c *imapClient.Client
+}
+
+type mailboxes struct {
+ mb map[string]*imap.MailboxInfo
+ mu sync.RWMutex
+}
+
+type Client struct {
+ connConf
+ mailboxes mailboxes
commander *commander
+ connections [numberConns]*connection
+ nextFreeIndex int
}
type Folder struct {
str string
delimiter string
}
-type mailboxes map[string]*imap.MailboxInfo
func (f Folder) String() string {
return f.str
@@ -40,21 +58,38 @@ func (f Folder) Append(other Folder) Folder {
}
}
-func (mbs mailboxes) contains(elem Folder) bool {
- _, ok := mbs[elem.str]
+func (mbs *mailboxes) contains(elem Folder) bool {
+ mbs.mu.RLock()
+ defer mbs.mu.RUnlock()
+
+ _, ok := mbs.mb[elem.str]
return ok
}
-func (mbs mailboxes) add(elem *imap.MailboxInfo) {
- mbs[elem.Name] = elem
+func (mbs *mailboxes) add(elem *imap.MailboxInfo) {
+ mbs.mu.Lock()
+ defer mbs.mu.Unlock()
+
+ mbs.mb[elem.Name] = elem
+}
+
+func (conn *connection) Disconnect() bool {
+ if conn != nil {
+ connected := (conn.c.State() & imap.ConnectedState) != 0
+ _ = conn.c.Logout()
+ return connected
+ }
+ return false
}
func (client *Client) Disconnect() {
if client != nil {
client.stopCommander()
- connected := (client.c.State() & imap.ConnectedState) != 0
- _ = client.c.Logout()
+ connected := false
+ for _, conn := range client.connections {
+ connected = conn.Disconnect() || connected
+ }
if connected {
log.Print("Disconnected from ", client.host)
@@ -73,13 +108,13 @@ func (client *Client) NewFolder(path []string) Folder {
return client.toplevel.Append(client.folderName(path))
}
-func (client *Client) createFolder(folder string) error {
- err := client.c.Create(folder)
+func (conn *connection) createFolder(folder string) error {
+ err := conn.c.Create(folder)
if err != nil {
return fmt.Errorf("creating folder '%s': %w", folder, err)
}
- err = client.c.Subscribe(folder)
+ err = conn.c.Subscribe(folder)
if err != nil {
return fmt.Errorf("subscribing to folder '%s': %w", folder, err)
}
@@ -89,11 +124,11 @@ func (client *Client) createFolder(folder string) error {
return nil
}
-func (client *Client) list(folder string) (*imap.MailboxInfo, int, error) {
+func (conn *connection) list(folder string) (*imap.MailboxInfo, int, error) {
mailboxes := make(chan *imap.MailboxInfo, 10)
done := make(chan error, 1)
go func() {
- done <- client.c.List("", folder, mailboxes)
+ done <- conn.c.List("", folder, mailboxes)
}()
found := 0
@@ -112,24 +147,23 @@ func (client *Client) list(folder string) (*imap.MailboxInfo, int, error) {
return mbox, found, nil
}
-func (client *Client) fetchDelimiter() error {
- mbox, _, err := client.list("")
+func (conn *connection) fetchDelimiter() (string, error) {
+ mbox, _, err := conn.list("")
if err != nil {
- return err
+ return "", err
}
- client.delimiter = mbox.Delimiter
- return nil
+ return mbox.Delimiter, nil
}
-func (client *Client) ensureFolder(folder Folder) error {
- if client.mailboxes.contains(folder) {
+func (conn *connection) ensureFolder(folder Folder) error {
+ if conn.mailboxes.contains(folder) {
return nil
}
log.Printf("Checking for folder '%s'", folder)
- mbox, found, err := client.list(folder.str)
+ mbox, found, err := conn.list(folder.str)
if err != nil {
return err
}
@@ -140,20 +174,20 @@ func (client *Client) ensureFolder(folder Folder) error {
switch found {
case 0:
- return client.createFolder(folder.str)
+ return conn.createFolder(folder.str)
case 1:
- client.mailboxes.add(mbox)
+ conn.mailboxes.add(mbox)
return nil
default:
return fmt.Errorf("Found multiple folders matching '%s'.", folder)
}
}
-func (client *Client) EnsureFolder(folder Folder, errorHandler ErrorHandler) {
- client.commander.execute(ensureCommando{folder}, errorHandler)
+func (client *Client) EnsureFolder(folder Folder) error {
+ return client.commander.execute(ensureCommando{folder})
}
-func (client *Client) putMessages(folder Folder, messages []string) error {
+func (conn *connection) putMessages(folder Folder, messages []string) error {
if len(messages) == 0 {
return nil
}
@@ -161,7 +195,7 @@ func (client *Client) putMessages(folder Folder, messages []string) error {
now := time.Now()
for _, msg := range messages {
reader := strings.NewReader(msg)
- if err := client.c.Append(folder.str, nil, now, reader); err != nil {
+ if err := conn.c.Append(folder.str, nil, now, reader); err != nil {
return fmt.Errorf("uploading message to %s: %w", folder, err)
}
}
@@ -169,6 +203,51 @@ func (client *Client) putMessages(folder Folder, messages []string) error {
return nil
}
-func (client *Client) PutMessages(folder Folder, messages []string, errorHandler ErrorHandler) {
- client.commander.execute(addCommando{folder, messages}, errorHandler)
+func (client *Client) PutMessages(folder Folder, messages []string) error {
+ return client.commander.execute(addCommando{folder, messages})
+}
+
+func (client *Client) createConnection(c *imapClient.Client) *connection{
+ if client.nextFreeIndex >= len(client.connections) {
+ panic("Too many connections")
+ }
+
+ conn := &connection{
+ connConf: &client.connConf,
+ mailboxes: &client.mailboxes,
+ c: c,
+ }
+
+ client.connections[client.nextFreeIndex] = conn
+ client.nextFreeIndex++
+
+ return conn
+}
+
+func (conn *connection) startTls() error {
+ hasStartTls, err := conn.c.SupportStartTLS()
+ if err != nil {
+ return fmt.Errorf("checking for starttls for %s: %w", conn.host, err)
+ }
+
+ if hasStartTls {
+ if err = conn.c.StartTLS(nil); err != nil {
+ return fmt.Errorf("enabling starttls for %s: %w", conn.host, err)
+ }
+
+ log.Print("Connected to ", conn.host, " (STARTTLS)")
+ } else {
+ log.Print("Connected to ", conn.host, " (Plain)")
+ }
+
+ return nil
}
+
+func NewClient() *Client {
+ return &Client{
+ mailboxes: mailboxes{
+ mb: map[string]*imap.MailboxInfo{},
+ mu: sync.RWMutex{},
+ },
+ }
+} \ No newline at end of file
diff --git a/internal/imap/commando.go b/internal/imap/commando.go
index fbfeaec..171a507 100644
--- a/internal/imap/commando.go
+++ b/internal/imap/commando.go
@@ -1,7 +1,5 @@
package imap
-import "github.com/Necoro/feed2imap-go/internal/log"
-
const maxPipeDepth = 10
type commander struct {
@@ -11,15 +9,12 @@ type commander struct {
}
type command interface {
- execute(client *Client) error
+ execute(*connection) error
}
-type ErrorHandler func(error) string
-
type execution struct {
cmd command
- done chan<- struct{}
- errorHandler ErrorHandler
+ done chan<- error
}
type addCommando struct {
@@ -27,25 +22,25 @@ type addCommando struct {
messages []string
}
-func (cmd addCommando) execute(client *Client) error {
- return client.putMessages(cmd.folder, cmd.messages)
+func (cmd addCommando) execute(conn *connection) error {
+ return conn.putMessages(cmd.folder, cmd.messages)
}
type ensureCommando struct {
folder Folder
}
-func (cmd ensureCommando) execute(client *Client) error {
- return client.ensureFolder(cmd.folder)
+func (cmd ensureCommando) execute(conn *connection) error {
+ return conn.ensureFolder(cmd.folder)
}
-func (commander *commander) execute(command command, handler ErrorHandler) {
- done := make(chan struct{})
- commander.pipe <- execution{command, done, handler}
- <-done
+func (commander *commander) execute(command command) error {
+ done := make(chan error)
+ commander.pipe <- execution{command, done}
+ return <-done
}
-func executioner(client *Client, pipe <-chan execution, done <-chan struct{}) {
+func executioner(conn *connection, pipe <-chan execution, done <-chan struct{}) {
for {
select {
case <-done:
@@ -56,14 +51,8 @@ func executioner(client *Client, pipe <-chan execution, done <-chan struct{}) {
return
default:
}
- if err := execution.cmd.execute(client); err != nil {
- if execution.errorHandler == nil {
- log.Error(err)
- } else {
- log.Error(execution.errorHandler(err))
- }
- }
- close(execution.done)
+ err := execution.cmd.execute(conn)
+ execution.done <- err
}
}
}
@@ -78,7 +67,11 @@ func (client *Client) startCommander() {
client.commander = &commander{client, pipe, done}
- go executioner(client, pipe, done)
+ for _, conn := range client.connections {
+ if conn != nil {
+ go executioner(conn, pipe, done)
+ }
+ }
}
func (client *Client) stopCommander() {
diff --git a/internal/imap/imap.go b/internal/imap/imap.go
index 4965ca5..52c61f0 100644
--- a/internal/imap/imap.go
+++ b/internal/imap/imap.go
@@ -56,51 +56,34 @@ func sanitizeUrl(url *url.URL) {
setDefaultPort(url)
}
-func Connect(url *url.URL) (*Client, error) {
- var c *imapClient.Client
- var err error
-
- sanitizeUrl(url)
-
- forceTls := forceTLS(url)
-
+func newImapClient(url *url.URL, forceTls bool) (*imapClient.Client,error) {
if forceTls {
- c, err = imapClient.DialTLS(url.Host, nil)
+ c, err := imapClient.DialTLS(url.Host, nil)
if err != nil {
return nil, fmt.Errorf("connecting (TLS) to %s: %w", url.Host, err)
}
log.Print("Connected to ", url.Host, " (TLS)")
+ return c, nil
} else {
- c, err = imapClient.Dial(url.Host)
+ c, err := imapClient.Dial(url.Host)
if err != nil {
return nil, fmt.Errorf("connecting to %s: %w", url.Host, err)
}
+ return c, nil
}
+}
- var client = Client{c: c, host: url.Host, mailboxes: mailboxes{}}
+func (client *Client) connect(url *url.URL, forceTls bool) (*connection, error) {
+ c, err := newImapClient(url, forceTls)
+ if err != nil {
+ return nil, err
+ }
- defer func() {
- if err != nil {
- client.Disconnect()
- }
- }()
+ conn := client.createConnection(c)
if !forceTls {
- var hasStartTls bool // explicit to avoid shadowing err
-
- hasStartTls, err = c.SupportStartTLS()
- if err != nil {
- return nil, fmt.Errorf("checking for starttls for %s: %w", url.Host, err)
- }
-
- if hasStartTls {
- if err = c.StartTLS(nil); err != nil {
- return nil, fmt.Errorf("enabling starttls for %s: %w", url.Host, err)
- }
-
- log.Print("Connected to ", url.Host, " (STARTTLS)")
- } else {
- log.Print("Connected to ", url.Host, " (Plain)")
+ if err = conn.startTls(); err != nil {
+ return nil, err
}
}
@@ -109,9 +92,33 @@ func Connect(url *url.URL) (*Client, error) {
return nil, fmt.Errorf("login to %s: %w", url.Host, err)
}
- if err = client.fetchDelimiter(); err != nil {
+ return conn, nil
+}
+
+func Connect(url *url.URL) (*Client, error) {
+ var err error
+
+ sanitizeUrl(url)
+ forceTls := forceTLS(url)
+
+ client := NewClient()
+ client.host = url.Host
+ defer func() {
+ if err != nil {
+ client.Disconnect()
+ }
+ }()
+
+ var conn *connection // the main connection
+ if conn, err = client.connect(url, forceTls); err != nil {
+ return nil, err
+ }
+
+ delim, err := conn.fetchDelimiter()
+ if err != nil {
return nil, fmt.Errorf("fetching delimiter: %w", err)
}
+ client.delimiter = delim
toplevel := url.Path
if toplevel[0] == '/' {
@@ -121,11 +128,18 @@ func Connect(url *url.URL) (*Client, error) {
log.Printf("Determined '%s' as toplevel, with '%s' as delimiter", client.toplevel, client.delimiter)
- if err = client.ensureFolder(client.toplevel); err != nil {
+ if err = conn.ensureFolder(client.toplevel); err != nil {
return nil, err
}
+ // the other connections
+ for i := 1; i < len(client.connections); i++ {
+ if _, err := client.connect(url, forceTls); err != nil { // explicitly new var 'err', b/c these are now harmless
+ log.Warnf("connecting #%d: %s", i, err)
+ }
+ }
+
client.startCommander()
- return &client, nil
+ return client, nil
}
diff --git a/main.go b/main.go
index 0fd5ba4..5764ae4 100644
--- a/main.go
+++ b/main.go
@@ -31,13 +31,15 @@ func processFeed(feed *feed.Feed, cfg *config.Config, client *imap.Client, wg *s
}
folder := client.NewFolder(feed.Target)
- client.EnsureFolder(folder, func(err error) string {
- return fmt.Sprintf("Creating folder of feed %s: %s", feed.Name, err)
- })
+ if err = client.EnsureFolder(folder); err != nil {
+ log.Errorf("Creating folder of feed %s: %s", feed.Name, err)
+ return
+ }
- client.PutMessages(folder, mails, func(err error) string {
- return fmt.Sprintf("Uploading messages of feed %s: %s", feed.Name, err)
- })
+ if err = client.PutMessages(folder, mails); err != nil {
+ log.Errorf("Uploading messages of feed %s: %s", feed.Name, err)
+ return
+ }
log.Printf("Uploaded %d messages to '%s' @ %s", len(mails), feed.Name, folder)
}