From d21881150c09986571a563eaf30bc1687787e63f Mon Sep 17 00:00:00 2001 From: René 'Necoro' Neumann Date: Sat, 25 Apr 2020 11:27:34 +0200 Subject: Improved caching --- internal/cache/cache.go | 120 ------------------------- internal/feed/cache.go | 230 ++++++++++++++++++++++++++++++++++++++++++++++++ internal/feed/feed.go | 64 ++++++++++++-- internal/feed/parse.go | 24 ++--- internal/yaml/yaml.go | 15 ++-- main.go | 23 ++--- 6 files changed, 315 insertions(+), 161 deletions(-) delete mode 100644 internal/cache/cache.go create mode 100644 internal/feed/cache.go diff --git a/internal/cache/cache.go b/internal/cache/cache.go deleted file mode 100644 index 979d661..0000000 --- a/internal/cache/cache.go +++ /dev/null @@ -1,120 +0,0 @@ -package cache - -import ( - "bufio" - "encoding/gob" - "errors" - "fmt" - "os" - - "github.com/Necoro/feed2imap-go/internal/log" -) - -const currentVersion byte = 1 - -type Cache interface { - Version() byte - transformToCurrent() (Cache, error) -} - -type feedId struct { - Name string - Url string -} - -type v1Cache struct { - version byte - Ids map[feedId]uint64 - NextId uint64 -} - -func (cache *v1Cache) Version() byte { - return cache.version -} - -func New() Cache { - cache := v1Cache{Ids: map[feedId]uint64{}} - cache.version = currentVersion - return &cache -} - -func cacheForVersion(version byte) (Cache, error) { - switch version { - case 1: - return New(), nil - default: - return nil, fmt.Errorf("unknown cache version '%d'", version) - } -} - -func (cache *v1Cache) transformToCurrent() (Cache, error) { - return cache, nil -} - -func Store(fileName string, cache Cache) error { - if cache == nil { - return fmt.Errorf("trying to store nil cache") - } - if cache.Version() != currentVersion { - return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion) - } - - f, err := os.Create(fileName) - if err != nil { - return fmt.Errorf("trying to store cache to '%s': %w", fileName, err) - } - defer f.Close() - - writer := bufio.NewWriter(f) - if err = writer.WriteByte(currentVersion); err != nil { - return fmt.Errorf("writing to '%s': %w", fileName, err) - } - - encoder := gob.NewEncoder(writer) - if err = encoder.Encode(cache); err != nil { - return fmt.Errorf("encoding cache: %w", err) - } - - writer.Flush() - log.Printf("Stored cache to '%s'.", fileName) - - return nil -} - -func Read(fileName string) (Cache, error) { - f, err := os.Open(fileName) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - // no cache there yet -- make new - return New(), nil - } - return nil, fmt.Errorf("opening cache at '%s': %w", fileName, err) - } - defer f.Close() - - log.Printf("Loading cache from '%s'", fileName) - - reader := bufio.NewReader(f) - version, err := reader.ReadByte() - if err != nil { - return nil, fmt.Errorf("reading from '%s': %w", fileName, err) - } - - cache, err := cacheForVersion(version) - if err != nil { - return nil, err - } - - decoder := gob.NewDecoder(reader) - if err = decoder.Decode(cache); err != nil { - return nil, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err) - } - - if cache, err = cache.transformToCurrent(); err != nil { - return nil, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err) - } - - log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion) - - return cache, nil -} diff --git a/internal/feed/cache.go b/internal/feed/cache.go new file mode 100644 index 0000000..4f27144 --- /dev/null +++ b/internal/feed/cache.go @@ -0,0 +1,230 @@ +package feed + +import ( + "bufio" + "crypto/sha256" + "encoding/gob" + "errors" + "fmt" + "os" + "time" + + "github.com/Necoro/feed2imap-go/internal/log" +) + +const ( + currentVersion byte = 1 + startFeedId uint64 = 1 +) + +type Cache interface { + findItem(*Feed) CachedFeed + Version() byte + transformToCurrent() (Cache, error) +} + +type feedId uint64 + +type feedDescriptor struct { + Name string + Url string +} + +type CachedFeed interface { + Checked(withFailure bool) + Failures() uint +} + +type cachedFeed struct { + LastCheck time.Time + NumFailures uint // can't be named `Failures` b/c it'll collide with the interface + Items []cachedItem +} + +func (cf *cachedFeed) Checked(withFailure bool) { + cf.LastCheck = time.Now() + if withFailure { + cf.NumFailures++ + } else { + cf.NumFailures = 0 + } +} + +func (cf *cachedFeed) Failures() uint { + return cf.NumFailures +} + +type itemHash [sha256.Size]byte + +type cachedItem struct { + Uid string + Title string + Link string + Date time.Time + Updated time.Time + Creator string + Hash itemHash +} + +type v1Cache struct { + version byte + Ids map[feedDescriptor]feedId + NextId uint64 + Feeds map[feedId]*cachedFeed +} + +func (cache *v1Cache) Version() byte { + return cache.version +} + +func New() Cache { + cache := v1Cache{ + Ids: map[feedDescriptor]feedId{}, + Feeds: map[feedId]*cachedFeed{}, + NextId: startFeedId, + } + cache.version = currentVersion + return &cache +} + +func cacheForVersion(version byte) (Cache, error) { + switch version { + case 1: + return New(), nil + default: + return nil, fmt.Errorf("unknown cache version '%d'", version) + } +} + +func (cache *v1Cache) transformToCurrent() (Cache, error) { + return cache, nil +} + +func (cache *v1Cache) getItem(id feedId) CachedFeed { + feed, ok := cache.Feeds[id] + if !ok { + feed = &cachedFeed{} + cache.Feeds[id] = feed + } + return feed +} + +func (cache *v1Cache) findItem(feed *Feed) CachedFeed { + if feed.cached != nil { + return feed.cached.(*cachedFeed) + } + + fId := feedDescriptor{Name: feed.Name, Url: feed.Url} + id, ok := cache.Ids[fId] + if !ok { + var otherId feedDescriptor + changed := false + for otherId, id = range cache.Ids { + if otherId.Name == fId.Name { + log.Warnf("Feed %s seems to have changed URLs: New '%s', old '%s'. Updating.", + fId.Name, fId.Url, otherId.Url) + changed = true + break + } else if otherId.Url == fId.Url { + log.Warnf("Feed with URL '%s' seems to have changed its name: New '%s', old '%s'. Updating", + fId.Url, fId.Name, otherId.Name) + changed = true + break + } + } + if changed { + delete(cache.Ids, otherId) + } else { + id = feedId(cache.NextId) + cache.NextId++ + } + + cache.Ids[fId] = id + } + + item := cache.getItem(id) + feed.cached = item + return item +} + +func (feeds *Feeds) StoreCache(fileName string) error { + cache := feeds.cache + if cache == nil { + return fmt.Errorf("trying to store nil cache") + } + if cache.Version() != currentVersion { + return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion) + } + + f, err := os.Create(fileName) + if err != nil { + return fmt.Errorf("trying to store cache to '%s': %w", fileName, err) + } + defer f.Close() + + writer := bufio.NewWriter(f) + if err = writer.WriteByte(currentVersion); err != nil { + return fmt.Errorf("writing to '%s': %w", fileName, err) + } + + encoder := gob.NewEncoder(writer) + if err = encoder.Encode(cache); err != nil { + return fmt.Errorf("encoding cache: %w", err) + } + + writer.Flush() + log.Printf("Stored cache to '%s'.", fileName) + + return nil +} + +func (feeds *Feeds) LoadCache(fileName string) error { + cache, err := loadCache(fileName) + if err != nil { + return err + } + feeds.cache = cache + + for _, feed := range feeds.feeds { + feed.cached = cache.findItem(feed) + } + return nil +} + +func loadCache(fileName string) (Cache, error) { + f, err := os.Open(fileName) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // no cache there yet -- make new + return New(), nil + } + return nil, fmt.Errorf("opening cache at '%s': %w", fileName, err) + } + defer f.Close() + + log.Printf("Loading cache from '%s'", fileName) + + reader := bufio.NewReader(f) + version, err := reader.ReadByte() + if err != nil { + return nil, fmt.Errorf("reading from '%s': %w", fileName, err) + } + + cache, err := cacheForVersion(version) + if err != nil { + return nil, err + } + + decoder := gob.NewDecoder(reader) + if err = decoder.Decode(cache); err != nil { + return nil, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err) + } + + if cache, err = cache.transformToCurrent(); err != nil { + return nil, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err) + } + + log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion) + + return cache, nil +} diff --git a/internal/feed/feed.go b/internal/feed/feed.go index cd906a2..5af4188 100644 --- a/internal/feed/feed.go +++ b/internal/feed/feed.go @@ -3,10 +3,13 @@ package feed import ( "fmt" "strings" + "sync" + "time" "github.com/mmcdole/gofeed" "github.com/Necoro/feed2imap-go/internal/config" + "github.com/Necoro/feed2imap-go/internal/log" ) type Feed struct { @@ -14,8 +17,9 @@ type Feed struct { Target []string Url string config.Options - feed *gofeed.Feed - items []feeditem + feed *gofeed.Feed + items []feeditem + cached CachedFeed } type feeditem struct { @@ -23,9 +27,18 @@ type feeditem struct { *gofeed.Item } -type Feeds map[string]*Feed +type Feeds struct { + feeds map[string]*Feed + cache Cache +} + +func NewFeeds() *Feeds { + return &Feeds{ + feeds: map[string]*Feed{}, + } +} -func (f Feeds) String() string { +func (feeds *Feeds) String() string { var b strings.Builder app := func(a ...interface{}) { _, _ = fmt.Fprint(&b, a...) @@ -33,7 +46,7 @@ func (f Feeds) String() string { app("Feeds [") first := true - for k, v := range f { + for k, v := range feeds.feeds { if !first { app(", ") } @@ -49,3 +62,44 @@ func (f Feeds) String() string { return b.String() } + +func (feeds *Feeds) Len() int { + return len(feeds.feeds) +} + +func (feeds *Feeds) Contains(name string) bool { + _, ok := feeds.feeds[name] + return ok +} + +func (feeds *Feeds) Set(name string, feed *Feed) { + feeds.feeds[name] = feed +} + +func (feeds *Feeds) Foreach(f func(*Feed)) { + for _, feed := range feeds.feeds { + f(feed) + } +} + +func (feeds *Feeds) ForeachGo(goFunc func(*Feed, *sync.WaitGroup)) { + var wg sync.WaitGroup + wg.Add(feeds.Len()) + + for _, feed := range feeds.feeds { + go goFunc(feed, &wg) + } + wg.Wait() +} + +func (feed *Feed) NeedsUpdate(updateTime time.Time) bool { + if !updateTime.IsZero() && int(time.Since(updateTime).Hours()) >= feed.MinFreq { + log.Printf("Feed '%s' does not need updating, skipping.", feed.Name) + return false + } + return true +} + +func (feed *Feed) Success() bool { + return feed.feed != nil +} diff --git a/internal/feed/parse.go b/internal/feed/parse.go index 35a7596..6deebb2 100644 --- a/internal/feed/parse.go +++ b/internal/feed/parse.go @@ -32,7 +32,7 @@ func parseFeed(feed *Feed) error { return nil } -func handleFeed(feed *Feed, group *sync.WaitGroup, success chan<- bool) { +func handleFeed(feed *Feed, group *sync.WaitGroup) { defer group.Done() log.Printf("Fetching %s from %s", feed.Name, feed.Url) @@ -40,25 +40,17 @@ func handleFeed(feed *Feed, group *sync.WaitGroup, success chan<- bool) { if err != nil { log.Error(err) } - success <- err == nil } -func Parse(feeds Feeds) int { - var wg sync.WaitGroup - wg.Add(len(feeds)) - - success := make(chan bool, len(feeds)) - - for _, feed := range feeds { - go handleFeed(feed, &wg, success) - } - - wg.Wait() - close(success) +func (feeds Feeds) Parse() int { + feeds.ForeachGo(handleFeed) ctr := 0 - for s := range success { - if s { + for _, feed := range feeds.feeds { + success := feed.Success() + feed.cached.Checked(!success) + + if success { ctr++ } } diff --git a/internal/yaml/yaml.go b/internal/yaml/yaml.go index 9dc2615..23a38ef 100644 --- a/internal/yaml/yaml.go +++ b/internal/yaml/yaml.go @@ -8,6 +8,7 @@ import ( C "github.com/Necoro/feed2imap-go/internal/config" F "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/internal/log" ) type config struct { @@ -77,7 +78,7 @@ func appTarget(target []string, app string) []string { } // Parse the group structure and populate the `Target` fields in the feeds -func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error { +func buildFeeds(cfg []configGroupFeed, target []string, feeds *F.Feeds) error { for idx := range cfg { f := &cfg[idx] // cannot use `_, f := range cfg` as it returns copies(!), but we need the originals target := appTarget(target, f.target()) @@ -91,15 +92,15 @@ func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error { return fmt.Errorf("Unnamed feed") } - if _, ok := feeds[name]; ok { + if feeds.Contains(name) { return fmt.Errorf("Duplicate Feed Name '%s'", name) } - feeds[name] = &F.Feed{ + feeds.Set(name, &F.Feed{ Name: f.Feed.Name, Target: target, Url: f.Feed.Url, Options: f.Feed.Options, - } + }) case f.isGroup(): if err := buildFeeds(f.Group.Feeds, target, feeds); err != nil { @@ -111,7 +112,9 @@ func buildFeeds(cfg []configGroupFeed, target []string, feeds F.Feeds) error { return nil } -func Load(path string) (*C.Config, F.Feeds, error) { +func Load(path string) (*C.Config, *F.Feeds, error) { + log.Printf("Reading configuration file '%s'", path) + buf, err := ioutil.ReadFile(path) if err != nil { return nil, nil, fmt.Errorf("while reading '%s': %w", path, err) @@ -122,7 +125,7 @@ func Load(path string) (*C.Config, F.Feeds, error) { return nil, nil, err } - feeds := F.Feeds{} + feeds := F.NewFeeds() if err := buildFeeds(parsedCfg.Feeds, []string{}, feeds); err != nil { return nil, nil, fmt.Errorf("while parsing: %w", err) diff --git a/main.go b/main.go index c4afc11..1850710 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "os" "sync" - "github.com/Necoro/feed2imap-go/internal/cache" "github.com/Necoro/feed2imap-go/internal/config" "github.com/Necoro/feed2imap-go/internal/feed" "github.com/Necoro/feed2imap-go/internal/imap" @@ -52,7 +51,6 @@ func run() error { log.Print("Starting up...") - log.Printf("Reading configuration file '%s'", *cfgFile) cfg, feeds, err := yaml.Load(*cfgFile) if err != nil { return err @@ -62,15 +60,15 @@ func run() error { return fmt.Errorf("Configuration invalid: %w", err) } - if success := feed.Parse(feeds); success == 0 { - return fmt.Errorf("No successfull feed fetch.") - } - - feedCache, err := cache.Read(*cacheFile) + err = feeds.LoadCache(*cacheFile) if err != nil { return err } + if success := feeds.Parse(); success == 0 { + return fmt.Errorf("No successfull feed fetch.") + } + imapUrl, err := url.Parse(cfg.Target) if err != nil { return fmt.Errorf("parsing 'target': %w", err) @@ -83,14 +81,11 @@ func run() error { defer c.Disconnect() - var wg sync.WaitGroup - wg.Add(len(feeds)) - for _, f := range feeds { - go processFeed(f, cfg, c, &wg) - } - wg.Wait() + feeds.ForeachGo(func(f *feed.Feed, wg *sync.WaitGroup) { + processFeed(f, cfg, c, wg) + }) - if err = cache.Store(*cacheFile, feedCache); err != nil { + if err = feeds.StoreCache(*cacheFile); err != nil { return err } -- cgit v1.2.3-70-g09d2