From 6bd87a567ef481b922f6baec2b475ec376c45443 Mon Sep 17 00:00:00 2001 From: René 'Necoro' Neumann Date: Thu, 23 Apr 2020 20:10:05 +0200 Subject: Concurrent imap --- internal/imap/client.go | 141 ++++++++++++++++++++++++++++++++++++---------- internal/imap/commando.go | 43 ++++++-------- internal/imap/imap.go | 82 ++++++++++++++++----------- 3 files changed, 176 insertions(+), 90 deletions(-) (limited to 'internal') diff --git a/internal/imap/client.go b/internal/imap/client.go index da06f54..5e2546b 100644 --- a/internal/imap/client.go +++ b/internal/imap/client.go @@ -3,6 +3,7 @@ package imap import ( "fmt" "strings" + "sync" "time" "github.com/emersion/go-imap" @@ -11,20 +12,37 @@ import ( "github.com/Necoro/feed2imap-go/internal/log" ) -type Client struct { - c *imapClient.Client +const numberConns = 5 + +type connConf struct { host string - mailboxes mailboxes delimiter string toplevel Folder +} + +type connection struct { + *connConf + mailboxes *mailboxes + c *imapClient.Client +} + +type mailboxes struct { + mb map[string]*imap.MailboxInfo + mu sync.RWMutex +} + +type Client struct { + connConf + mailboxes mailboxes commander *commander + connections [numberConns]*connection + nextFreeIndex int } type Folder struct { str string delimiter string } -type mailboxes map[string]*imap.MailboxInfo func (f Folder) String() string { return f.str @@ -40,21 +58,38 @@ func (f Folder) Append(other Folder) Folder { } } -func (mbs mailboxes) contains(elem Folder) bool { - _, ok := mbs[elem.str] +func (mbs *mailboxes) contains(elem Folder) bool { + mbs.mu.RLock() + defer mbs.mu.RUnlock() + + _, ok := mbs.mb[elem.str] return ok } -func (mbs mailboxes) add(elem *imap.MailboxInfo) { - mbs[elem.Name] = elem +func (mbs *mailboxes) add(elem *imap.MailboxInfo) { + mbs.mu.Lock() + defer mbs.mu.Unlock() + + mbs.mb[elem.Name] = elem +} + +func (conn *connection) Disconnect() bool { + if conn != nil { + connected := (conn.c.State() & imap.ConnectedState) != 0 + _ = conn.c.Logout() + return connected + } + return false } func (client *Client) Disconnect() { if client != nil { client.stopCommander() - connected := (client.c.State() & imap.ConnectedState) != 0 - _ = client.c.Logout() + connected := false + for _, conn := range client.connections { + connected = conn.Disconnect() || connected + } if connected { log.Print("Disconnected from ", client.host) @@ -73,13 +108,13 @@ func (client *Client) NewFolder(path []string) Folder { return client.toplevel.Append(client.folderName(path)) } -func (client *Client) createFolder(folder string) error { - err := client.c.Create(folder) +func (conn *connection) createFolder(folder string) error { + err := conn.c.Create(folder) if err != nil { return fmt.Errorf("creating folder '%s': %w", folder, err) } - err = client.c.Subscribe(folder) + err = conn.c.Subscribe(folder) if err != nil { return fmt.Errorf("subscribing to folder '%s': %w", folder, err) } @@ -89,11 +124,11 @@ func (client *Client) createFolder(folder string) error { return nil } -func (client *Client) list(folder string) (*imap.MailboxInfo, int, error) { +func (conn *connection) list(folder string) (*imap.MailboxInfo, int, error) { mailboxes := make(chan *imap.MailboxInfo, 10) done := make(chan error, 1) go func() { - done <- client.c.List("", folder, mailboxes) + done <- conn.c.List("", folder, mailboxes) }() found := 0 @@ -112,24 +147,23 @@ func (client *Client) list(folder string) (*imap.MailboxInfo, int, error) { return mbox, found, nil } -func (client *Client) fetchDelimiter() error { - mbox, _, err := client.list("") +func (conn *connection) fetchDelimiter() (string, error) { + mbox, _, err := conn.list("") if err != nil { - return err + return "", err } - client.delimiter = mbox.Delimiter - return nil + return mbox.Delimiter, nil } -func (client *Client) ensureFolder(folder Folder) error { - if client.mailboxes.contains(folder) { +func (conn *connection) ensureFolder(folder Folder) error { + if conn.mailboxes.contains(folder) { return nil } log.Printf("Checking for folder '%s'", folder) - mbox, found, err := client.list(folder.str) + mbox, found, err := conn.list(folder.str) if err != nil { return err } @@ -140,20 +174,20 @@ func (client *Client) ensureFolder(folder Folder) error { switch found { case 0: - return client.createFolder(folder.str) + return conn.createFolder(folder.str) case 1: - client.mailboxes.add(mbox) + conn.mailboxes.add(mbox) return nil default: return fmt.Errorf("Found multiple folders matching '%s'.", folder) } } -func (client *Client) EnsureFolder(folder Folder, errorHandler ErrorHandler) { - client.commander.execute(ensureCommando{folder}, errorHandler) +func (client *Client) EnsureFolder(folder Folder) error { + return client.commander.execute(ensureCommando{folder}) } -func (client *Client) putMessages(folder Folder, messages []string) error { +func (conn *connection) putMessages(folder Folder, messages []string) error { if len(messages) == 0 { return nil } @@ -161,7 +195,7 @@ func (client *Client) putMessages(folder Folder, messages []string) error { now := time.Now() for _, msg := range messages { reader := strings.NewReader(msg) - if err := client.c.Append(folder.str, nil, now, reader); err != nil { + if err := conn.c.Append(folder.str, nil, now, reader); err != nil { return fmt.Errorf("uploading message to %s: %w", folder, err) } } @@ -169,6 +203,51 @@ func (client *Client) putMessages(folder Folder, messages []string) error { return nil } -func (client *Client) PutMessages(folder Folder, messages []string, errorHandler ErrorHandler) { - client.commander.execute(addCommando{folder, messages}, errorHandler) +func (client *Client) PutMessages(folder Folder, messages []string) error { + return client.commander.execute(addCommando{folder, messages}) +} + +func (client *Client) createConnection(c *imapClient.Client) *connection{ + if client.nextFreeIndex >= len(client.connections) { + panic("Too many connections") + } + + conn := &connection{ + connConf: &client.connConf, + mailboxes: &client.mailboxes, + c: c, + } + + client.connections[client.nextFreeIndex] = conn + client.nextFreeIndex++ + + return conn +} + +func (conn *connection) startTls() error { + hasStartTls, err := conn.c.SupportStartTLS() + if err != nil { + return fmt.Errorf("checking for starttls for %s: %w", conn.host, err) + } + + if hasStartTls { + if err = conn.c.StartTLS(nil); err != nil { + return fmt.Errorf("enabling starttls for %s: %w", conn.host, err) + } + + log.Print("Connected to ", conn.host, " (STARTTLS)") + } else { + log.Print("Connected to ", conn.host, " (Plain)") + } + + return nil } + +func NewClient() *Client { + return &Client{ + mailboxes: mailboxes{ + mb: map[string]*imap.MailboxInfo{}, + mu: sync.RWMutex{}, + }, + } +} \ No newline at end of file diff --git a/internal/imap/commando.go b/internal/imap/commando.go index fbfeaec..171a507 100644 --- a/internal/imap/commando.go +++ b/internal/imap/commando.go @@ -1,7 +1,5 @@ package imap -import "github.com/Necoro/feed2imap-go/internal/log" - const maxPipeDepth = 10 type commander struct { @@ -11,15 +9,12 @@ type commander struct { } type command interface { - execute(client *Client) error + execute(*connection) error } -type ErrorHandler func(error) string - type execution struct { cmd command - done chan<- struct{} - errorHandler ErrorHandler + done chan<- error } type addCommando struct { @@ -27,25 +22,25 @@ type addCommando struct { messages []string } -func (cmd addCommando) execute(client *Client) error { - return client.putMessages(cmd.folder, cmd.messages) +func (cmd addCommando) execute(conn *connection) error { + return conn.putMessages(cmd.folder, cmd.messages) } type ensureCommando struct { folder Folder } -func (cmd ensureCommando) execute(client *Client) error { - return client.ensureFolder(cmd.folder) +func (cmd ensureCommando) execute(conn *connection) error { + return conn.ensureFolder(cmd.folder) } -func (commander *commander) execute(command command, handler ErrorHandler) { - done := make(chan struct{}) - commander.pipe <- execution{command, done, handler} - <-done +func (commander *commander) execute(command command) error { + done := make(chan error) + commander.pipe <- execution{command, done} + return <-done } -func executioner(client *Client, pipe <-chan execution, done <-chan struct{}) { +func executioner(conn *connection, pipe <-chan execution, done <-chan struct{}) { for { select { case <-done: @@ -56,14 +51,8 @@ func executioner(client *Client, pipe <-chan execution, done <-chan struct{}) { return default: } - if err := execution.cmd.execute(client); err != nil { - if execution.errorHandler == nil { - log.Error(err) - } else { - log.Error(execution.errorHandler(err)) - } - } - close(execution.done) + err := execution.cmd.execute(conn) + execution.done <- err } } } @@ -78,7 +67,11 @@ func (client *Client) startCommander() { client.commander = &commander{client, pipe, done} - go executioner(client, pipe, done) + for _, conn := range client.connections { + if conn != nil { + go executioner(conn, pipe, done) + } + } } func (client *Client) stopCommander() { diff --git a/internal/imap/imap.go b/internal/imap/imap.go index 4965ca5..52c61f0 100644 --- a/internal/imap/imap.go +++ b/internal/imap/imap.go @@ -56,51 +56,34 @@ func sanitizeUrl(url *url.URL) { setDefaultPort(url) } -func Connect(url *url.URL) (*Client, error) { - var c *imapClient.Client - var err error - - sanitizeUrl(url) - - forceTls := forceTLS(url) - +func newImapClient(url *url.URL, forceTls bool) (*imapClient.Client,error) { if forceTls { - c, err = imapClient.DialTLS(url.Host, nil) + c, err := imapClient.DialTLS(url.Host, nil) if err != nil { return nil, fmt.Errorf("connecting (TLS) to %s: %w", url.Host, err) } log.Print("Connected to ", url.Host, " (TLS)") + return c, nil } else { - c, err = imapClient.Dial(url.Host) + c, err := imapClient.Dial(url.Host) if err != nil { return nil, fmt.Errorf("connecting to %s: %w", url.Host, err) } + return c, nil } +} - var client = Client{c: c, host: url.Host, mailboxes: mailboxes{}} +func (client *Client) connect(url *url.URL, forceTls bool) (*connection, error) { + c, err := newImapClient(url, forceTls) + if err != nil { + return nil, err + } - defer func() { - if err != nil { - client.Disconnect() - } - }() + conn := client.createConnection(c) if !forceTls { - var hasStartTls bool // explicit to avoid shadowing err - - hasStartTls, err = c.SupportStartTLS() - if err != nil { - return nil, fmt.Errorf("checking for starttls for %s: %w", url.Host, err) - } - - if hasStartTls { - if err = c.StartTLS(nil); err != nil { - return nil, fmt.Errorf("enabling starttls for %s: %w", url.Host, err) - } - - log.Print("Connected to ", url.Host, " (STARTTLS)") - } else { - log.Print("Connected to ", url.Host, " (Plain)") + if err = conn.startTls(); err != nil { + return nil, err } } @@ -109,9 +92,33 @@ func Connect(url *url.URL) (*Client, error) { return nil, fmt.Errorf("login to %s: %w", url.Host, err) } - if err = client.fetchDelimiter(); err != nil { + return conn, nil +} + +func Connect(url *url.URL) (*Client, error) { + var err error + + sanitizeUrl(url) + forceTls := forceTLS(url) + + client := NewClient() + client.host = url.Host + defer func() { + if err != nil { + client.Disconnect() + } + }() + + var conn *connection // the main connection + if conn, err = client.connect(url, forceTls); err != nil { + return nil, err + } + + delim, err := conn.fetchDelimiter() + if err != nil { return nil, fmt.Errorf("fetching delimiter: %w", err) } + client.delimiter = delim toplevel := url.Path if toplevel[0] == '/' { @@ -121,11 +128,18 @@ func Connect(url *url.URL) (*Client, error) { log.Printf("Determined '%s' as toplevel, with '%s' as delimiter", client.toplevel, client.delimiter) - if err = client.ensureFolder(client.toplevel); err != nil { + if err = conn.ensureFolder(client.toplevel); err != nil { return nil, err } + // the other connections + for i := 1; i < len(client.connections); i++ { + if _, err := client.connect(url, forceTls); err != nil { // explicitly new var 'err', b/c these are now harmless + log.Warnf("connecting #%d: %s", i, err) + } + } + client.startCommander() - return &client, nil + return client, nil } -- cgit v1.2.3-54-g00ecf