aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRené 'Necoro' Neumann <necoro@necoro.eu>2020-04-25 11:27:34 +0200
committerRené 'Necoro' Neumann <necoro@necoro.eu>2020-04-25 11:27:34 +0200
commitd21881150c09986571a563eaf30bc1687787e63f (patch)
treea5da8a3fdb91a3dcf806b704e20b16616a934801
parentc08aff21cd67cc27926a4cb1ca72ffe67e015ebf (diff)
downloadfeed2imap-go-d21881150c09986571a563eaf30bc1687787e63f.tar.gz
feed2imap-go-d21881150c09986571a563eaf30bc1687787e63f.tar.bz2
feed2imap-go-d21881150c09986571a563eaf30bc1687787e63f.zip
Improved caching
-rw-r--r--internal/cache/cache.go120
-rw-r--r--internal/feed/cache.go230
-rw-r--r--internal/feed/feed.go64
-rw-r--r--internal/feed/parse.go24
-rw-r--r--internal/yaml/yaml.go15
-rw-r--r--main.go23
6 files changed, 315 insertions, 161 deletions
diff --git a/internal/cache/cache.go b/internal/cache/cache.go
deleted file mode 100644
index 979d661..0000000
--- a/internal/cache/cache.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package cache
-
-import (
- "bufio"
- "encoding/gob"
- "errors"
- "fmt"
- "os"
-
- "github.com/Necoro/feed2imap-go/internal/log"
-)
-
-const currentVersion byte = 1
-
-type Cache interface {
- Version() byte
- transformToCurrent() (Cache, error)
-}
-
-type feedId struct {
- Name string
- Url string
-}
-
-type v1Cache struct {
- version byte
- Ids map[feedId]uint64
- NextId uint64
-}
-
-func (cache *v1Cache) Version() byte {
- return cache.version
-}
-
-func New() Cache {
- cache := v1Cache{Ids: map[feedId]uint64{}}
- cache.version = currentVersion
- return &cache
-}
-
-func cacheForVersion(version byte) (Cache, error) {
- switch version {
- case 1:
- return New(), nil
- default:
- return nil, fmt.Errorf("unknown cache version '%d'", version)
- }
-}
-
-func (cache *v1Cache) transformToCurrent() (Cache, error) {
- return cache, nil
-}
-
-func Store(fileName string, cache Cache) error {
- if cache == nil {
- return fmt.Errorf("trying to store nil cache")
- }
- if cache.Version() != currentVersion {
- return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion)
- }
-
- f, err := os.Create(fileName)
- if err != nil {
- return fmt.Errorf("trying to store cache to '%s': %w", fileName, err)
- }
- defer f.Close()
-
- writer := bufio.NewWriter(f)
- if err = writer.WriteByte(currentVersion); err != nil {
- return fmt.Errorf("writing to '%s': %w", fileName, err)
- }
-
- encoder := gob.NewEncoder(writer)
- if err = encoder.Encode(cache); err != nil {
- return fmt.Errorf("encoding cache: %w", err)
- }
-
- writer.Flush()
- log.Printf("Stored cache to '%s'.", fileName)
-
- return nil
-}
-
-func Read(fileName string) (Cache, error) {
- f, err := os.Open(fileName)
- if err != nil {
- if errors.Is(err, os.ErrNotExist) {
- // no cache there yet -- make new
- return New(), nil
- }
- return nil, fmt.Errorf("opening cache at '%s': %w", fileName, err)
- }
- defer f.Close()
-
- log.Printf("Loading cache from '%s'", fileName)
-
- reader := bufio.NewReader(f)
- version, err := reader.ReadByte()
- if err != nil {
- return nil, fmt.Errorf("reading from '%s': %w", fileName, err)
- }
-
- cache, err := cacheForVersion(version)
- if err != nil {
- return nil, err
- }
-
- decoder := gob.NewDecoder(reader)
- if err = decoder.Decode(cache); err != nil {
- return nil, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err)
- }
-
- if cache, err = cache.transformToCurrent(); err != nil {
- return nil, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err)
- }
-
- log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion)
-
- return cache, nil
-}
diff --git a/internal/feed/cache.go b/internal/feed/cache.go
new file mode 100644
index 0000000..4f27144
--- /dev/null
+++ b/internal/feed/cache.go
@@ -0,0 +1,230 @@
+package feed
+
+import (
+ "bufio"
+ "crypto/sha256"
+ "encoding/gob"
+ "errors"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/Necoro/feed2imap-go/internal/log"
+)
+
+const (
+ currentVersion byte = 1
+ startFeedId uint64 = 1
+)
+
+type Cache interface {
+ findItem(*Feed) CachedFeed
+ Version() byte
+ transformToCurrent() (Cache, error)
+}
+
+type feedId uint64
+
+type feedDescriptor struct {
+ Name string
+ Url string
+}
+
+type CachedFeed interface {
+ Checked(withFailure bool)
+ Failures() uint
+}
+
+type cachedFeed struct {
+ LastCheck time.Time
+ NumFailures uint // can't be named `Failures` b/c it'll collide with the interface
+ Items []cachedItem
+}
+
+func (cf *cachedFeed) Checked(withFailure bool) {
+ cf.LastCheck = time.Now()
+ if withFailure {
+ cf.NumFailures++
+ } else {
+ cf.NumFailures = 0
+ }
+}
+
+func (cf *cachedFeed) Failures() uint {
+ return cf.NumFailures
+}
+
+type itemHash [sha256.Size]byte
+
+type cachedItem struct {
+ Uid string
+ Title string
+ Link string
+ Date time.Time
+ Updated time.Time
+ Creator string
+ Hash itemHash
+}
+
+type v1Cache struct {
+ version byte
+ Ids map[feedDescriptor]feedId
+ NextId uint64
+ Feeds map[feedId]*cachedFeed
+}
+
+func (cache *v1Cache) Version() byte {
+ return cache.version
+}
+
+func New() Cache {
+ cache := v1Cache{
+ Ids: map[feedDescriptor]feedId{},
+ Feeds: map[feedId]*cachedFeed{},
+ NextId: startFeedId,
+ }
+ cache.version = currentVersion
+ return &cache
+}
+
+func cacheForVersion(version byte) (Cache, error) {
+ switch version {
+ case 1:
+ return New(), nil
+ default:
+ return nil, fmt.Errorf("unknown cache version '%d'", version)
+ }
+}
+
+func (cache *v1Cache) transformToCurrent() (Cache, error) {
+ return cache, nil
+}
+
+func (cache *v1Cache) getItem(id feedId) CachedFeed {
+ feed, ok := cache.Feeds[id]
+ if !ok {
+ feed = &cachedFeed{}
+ cache.Feeds[id] = feed
+ }
+ return feed
+}
+
+func (cache *v1Cache) findItem(feed *Feed) CachedFeed {
+ if feed.cached != nil {
+ return feed.cached.(*cachedFeed)
+ }
+
+ fId := feedDescriptor{Name: feed.Name, Url: feed.Url}
+ id, ok := cache.Ids[fId]
+ if !ok {
+ var otherId feedDescriptor
+ changed := false
+ for otherId, id = range cache.Ids {
+ if otherId.Name == fId.Name {
+ log.Warnf("Feed %s seems to have changed URLs: New '%s', old '%s'. Updating.",
+ fId.Name, fId.Url, otherId.Url)
+ changed = true
+ break
+ } else if otherId.Url == fId.Url {
+ log.Warnf("Feed with URL '%s' seems to have changed its name: New '%s', old '%s'. Updating",
+ fId.Url, fId.Name, otherId.Name)
+ changed = true
+ break
+ }
+ }
+ if changed {
+ delete(cache.Ids, otherId)
+ } else {
+ id = feedId(cache.NextId)
+ cache.NextId++
+ }
+
+ cache.Ids[fId] = id
+ }
+
+ item := cache.getItem(id)
+ feed.cached = item
+ return item
+}
+
+func (feeds *Feeds) StoreCache(fileName string) error {
+ cache := feeds.cache
+ if cache == nil {
+ return fmt.Errorf("trying to store nil cache")
+ }
+ if cache.Version() != currentVersion {
+ return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion)
+ }
+
+ f, err := os.Create(fileName)
+ if err != nil {
+ return fmt.Errorf("trying to store cache to '%s': %w", fileName, err)
+ }
+ defer f.Close()
+
+ writer := bufio.NewWriter(f)
+ if err = writer.WriteByte(currentVersion); err != nil {
+ return fmt.Errorf("writing to '%s': %w", fileName, err)
+ }
+
+ encoder := gob.NewEncoder(writer)
+ if err = encoder.Encode(cache); err != nil {
+ return fmt.Errorf("encoding cache: %w", err)
+ }
+
+ writer.Flush()
+ log.Printf("Stored cache to '%s'.", fileName)
+
+ return nil
+}
+
+func (feeds *Feeds) LoadCache(fileName string) error {
+ cache, err := loadCache(fileName)
+ if err != nil {
+ return err
+ }
+ feeds.cache = cache
+
+ for _, feed := range feeds.feeds {
+ feed.cached = cache.findItem(feed)
+ }
+ return nil
+}
+
+func loadCache(fileName string) (Cache, error) {
+ f, err := os.Open(fileName)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ // no cache there yet -- make new
+ return New(), nil
+ }
+ return nil, fmt.Errorf("opening cache at '%s': %w", fileName, err)
+ }
+ defer f.Close()
+
+ log.Printf("Loading cache from '%s'", fileName)
+
+ reader := bufio.NewReader(f)
+ version, err := reader.ReadByte()
+ if err != nil {
+ return nil, fmt.Errorf("reading from '%s': %w", fileName, err)
+ }
+
+ cache, err := cacheForVersion(version)
+ if err != nil {
+ return nil, err
+ }
+
+ decoder := gob.NewDecoder(reader)
+ if err = decoder.Decode(cache); err != nil {
+ return nil, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err)
+ }
+
+ if cache, err = cache.transformToCurrent(); err != nil {
+ return nil, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err)
+ }
+
+ log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion)
+
+ return cache, nil
+}
diff --git a/internal/feed/feed.go b/internal/feed/feed.go
index cd906a2..5af4188 100644
--- a/internal/feed/feed.go
+++ b/internal/feed/feed.go
@@ -3,10 +3,13 @@ package feed
import (
"fmt"
"strings"
+ "sync"
+ "time"
"github.com/mmcdole/gofeed"
"github.com/Necoro/feed2imap-go/internal/config"
+ "github.com/Necoro/feed2imap-go/internal/log"
)
type Feed struct {
@@ -14,8 +17,9 @@ type Feed struct {
Target []string
Url string
config.Options
- feed *gofeed.Feed
- items []feeditem
+ feed *gofeed.Feed
+ items []feeditem
+ cached CachedFeed
}
type feeditem struct {
@@ -23,9 +27,18 @@ type feeditem struct {
*gofeed.Item
}
-type Feeds map[string]*Feed
+type Feeds struct {
+ feeds map[string]*Feed
+ cache Cache
+}
+
+func NewFeeds() *Feeds {
+ return &Feeds{
+ feeds: map[string]*Feed{},
+ }
+}
-func (f Feeds) String() string {
+func (feeds *Feeds) String() string {
var b strings.Builder
app := func(a ...interface{}) {
_, _ = fmt.Fprint(&b, a...)
@@ -33,7 +46,7 @@ func (f Feeds) String() string {
app("Feeds [")
first := true
- for k, v := range f {
+ for k, v := range feeds.feeds {
if !first {
app(", ")
}
@@ -49,3 +62,44 @@ func (f Feeds) String() string {
return b.String()
}
+
+func (feeds *Feeds) Len() int {
+ return len(feeds.feeds)
+}
+
+func (feeds *Feeds) Contains(name string) bool {
+ _, ok := feeds.feeds[name]
+ return ok
+}
+
+func (feeds *Feeds) Set(name string, feed *Feed) {
+ feeds.feeds[name] = feed
+}
+
+func (feeds *Feeds) Foreach(f func(*Feed)) {
+ for _, feed := range feeds.feeds {
+ f(feed)
+ }
+}
+
+func (feeds *Feeds) ForeachGo(goFunc func(*Feed, *sync.WaitGroup)) {
+ var wg sync.WaitGroup
+ wg.Add(feeds.Len())
+
+ for _, feed := range feeds.feeds {
+ go goFunc(feed, &wg)
+ }
+ wg.Wait()
+}
+
+func (feed *Feed) NeedsUpdate(updateTime time.Time) bool {
+ if !updateTime.IsZero() && int(time.Since(updateTime).Hours()) >= feed.MinFreq {
+ log.Printf("Feed '%s' does not need updating, skipping.", feed.Name)
+ return false
+ }
+ return true
+}
+
+func (feed *Feed) Success() bool {
+ return feed.feed != nil
+}
diff --git a/internal/feed/parse.go b/internal/feed/parse.go
index 35a7596..6deebb2 100644
--- a/internal/feed/parse.go
+++ b/internal/feed/parse.go
@@ -32,7 +32,7 @@ func parseFeed(feed *Feed) error {
return nil
}
-func handleFeed(feed *Feed, group *sync.WaitGroup, success chan<- bool) {
+func handleFeed(feed *Feed, group *sync.WaitGroup) {
defer group.Done()
log.Printf("Fetching %s from %s", feed.Name, feed.Url)
@@ -40,25 +40,17 @@ func handleFeed(feed *Feed, group *sync.WaitGroup, success chan<- bool) {
if err != nil {
log.Error(err)
}
- success <- err == nil
}
-func Parse(feeds Feeds) int {
- var wg sync.WaitGroup
- wg.Add(len(feeds))
-
- success := make(chan bool, len(feeds))
-
- for _, feed := range feeds {
- go handleFeed(feed, &wg, success)
- }
-
- wg.Wait()
- close(success)
+func (feeds Feeds) Parse() int {
+ feeds.ForeachGo(handleFeed)
ctr := 0
- for s := range success {
- if s {
+ for _, feed := range feeds.feeds {
+ success := feed.Success()
+ feed.cached.Checked(!success)
+
+ if success {
ctr++
}
}
diff --git a/internal/yaml/yaml.go b/internal/yaml/yaml.go
index 9dc2615..23a38ef 100644
--- a/internal/yaml/yaml.go
+++ b/internal/yaml/yaml.go
@@ -8,6 +8,7 @@ import (
C "github.com/Necoro/feed2imap-go/internal/config"
F "github.com/Necoro/feed2imap-go/internal/feed"
+ "github.com/Necoro/feed2imap-go/internal/log"
)
type config struct {
@@ -77,7 +78,7 @@ func appTarget(target []string, app string) []string {
}
// Parse the group structure and populate the `Target` fields in the feeds
-func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error {
+func buildFeeds(cfg []configGroupFeed, target []string, feeds *F.Feeds) error {
for idx := range cfg {
f := &cfg[idx] // cannot use `_, f := range cfg` as it returns copies(!), but we need the originals
target := appTarget(target, f.target())
@@ -91,15 +92,15 @@ func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error {
return fmt.Errorf("Unnamed feed")
}
- if _, ok := feeds[name]; ok {
+ if feeds.Contains(name) {
return fmt.Errorf("Duplicate Feed Name '%s'", name)
}
- feeds[name] = &F.Feed{
+ feeds.Set(name, &F.Feed{
Name: f.Feed.Name,
Target: target,
Url: f.Feed.Url,
Options: f.Feed.Options,
- }
+ })
case f.isGroup():
if err := buildFeeds(f.Group.Feeds, target, feeds); err != nil {
@@ -111,7 +112,9 @@ func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error {
return nil
}
-func Load(path string) (*C.Config, F.Feeds, error) {
+func Load(path string) (*C.Config, *F.Feeds, error) {
+ log.Printf("Reading configuration file '%s'", path)
+
buf, err := ioutil.ReadFile(path)
if err != nil {
return nil, nil, fmt.Errorf("while reading '%s': %w", path, err)
@@ -122,7 +125,7 @@ func Load(path string) (*C.Config, F.Feeds, error) {
return nil, nil, err
}
- feeds := F.Feeds{}
+ feeds := F.NewFeeds()
if err := buildFeeds(parsedCfg.Feeds, []string{}, feeds); err != nil {
return nil, nil, fmt.Errorf("while parsing: %w", err)
diff --git a/main.go b/main.go
index c4afc11..1850710 100644
--- a/main.go
+++ b/main.go
@@ -7,7 +7,6 @@ import (
"os"
"sync"
- "github.com/Necoro/feed2imap-go/internal/cache"
"github.com/Necoro/feed2imap-go/internal/config"
"github.com/Necoro/feed2imap-go/internal/feed"
"github.com/Necoro/feed2imap-go/internal/imap"
@@ -52,7 +51,6 @@ func run() error {
log.Print("Starting up...")
- log.Printf("Reading configuration file '%s'", *cfgFile)
cfg, feeds, err := yaml.Load(*cfgFile)
if err != nil {
return err
@@ -62,15 +60,15 @@ func run() error {
return fmt.Errorf("Configuration invalid: %w", err)
}
- if success := feed.Parse(feeds); success == 0 {
- return fmt.Errorf("No successfull feed fetch.")
- }
-
- feedCache, err := cache.Read(*cacheFile)
+ err = feeds.LoadCache(*cacheFile)
if err != nil {
return err
}
+ if success := feeds.Parse(); success == 0 {
+ return fmt.Errorf("No successfull feed fetch.")
+ }
+
imapUrl, err := url.Parse(cfg.Target)
if err != nil {
return fmt.Errorf("parsing 'target': %w", err)
@@ -83,14 +81,11 @@ func run() error {
defer c.Disconnect()
- var wg sync.WaitGroup
- wg.Add(len(feeds))
- for _, f := range feeds {
- go processFeed(f, cfg, c, &wg)
- }
- wg.Wait()
+ feeds.ForeachGo(func(f *feed.Feed, wg *sync.WaitGroup) {
+ processFeed(f, cfg, c, wg)
+ })
- if err = cache.Store(*cacheFile, feedCache); err != nil {
+ if err = feeds.StoreCache(*cacheFile); err != nil {
return err
}