diff options
author | René 'Necoro' Neumann <necoro@necoro.eu> | 2021-02-22 22:54:43 +0100 |
---|---|---|
committer | René 'Necoro' Neumann <necoro@necoro.eu> | 2021-02-22 22:54:43 +0100 |
commit | 7106d5a6e7585dce5fdd552cca30063dd352dc23 (patch) | |
tree | 88a9cb7150f86fcadb8f87e3a6d0892bf68c2251 /internal/feed/cache | |
parent | fb2aa9b1f04d509c8215c1fa6505a144482c343d (diff) | |
download | feed2imap-go-7106d5a6e7585dce5fdd552cca30063dd352dc23.tar.gz feed2imap-go-7106d5a6e7585dce5fdd552cca30063dd352dc23.tar.bz2 feed2imap-go-7106d5a6e7585dce5fdd552cca30063dd352dc23.zip |
Split cache and feed packages
Diffstat (limited to '')
-rw-r--r-- | internal/feed/cache/cache.go (renamed from internal/feed/cache.go) | 31 | ||||
-rw-r--r-- | internal/feed/cache/cache_v1.go (renamed from internal/feed/cache_v1.go) | 63 | ||||
-rw-r--r-- | internal/feed/cache/state.go | 146 |
3 files changed, 196 insertions, 44 deletions
diff --git a/internal/feed/cache.go b/internal/feed/cache/cache.go index 5f5c394..e7fe673 100644 --- a/internal/feed/cache.go +++ b/internal/feed/cache/cache.go @@ -1,4 +1,4 @@ -package feed +package cache import ( "bufio" @@ -11,6 +11,7 @@ import ( "github.com/nightlyone/lockfile" + "github.com/Necoro/feed2imap-go/internal/feed" "github.com/Necoro/feed2imap-go/pkg/log" ) @@ -20,30 +21,36 @@ const ( currentVersion Version = 1 ) -type CacheImpl interface { - findItem(*Feed) CachedFeed +type Impl interface { + cachedFeed(*feed.Feed) CachedFeed + transformToCurrent() (Impl, error) Version() Version Info() string SpecificInfo(interface{}) string - transformToCurrent() (CacheImpl, error) } type Cache struct { - CacheImpl + Impl lock lockfile.Lockfile locked bool } type CachedFeed interface { + // Checked marks the feed as being a failure or a success on last check. Checked(withFailure bool) + // Failures of this feed up to now. Failures() int + // The Last time, this feed has been checked Last() time.Time - ID() string - filterItems(items []item, ignoreHash bool, alwaysNew bool) []item + // Filter the given items against the cached items. + Filter(items []feed.Item, ignoreHash bool, alwaysNew bool) []feed.Item + // Commit any changes done to the cache state. Commit() + // The Feed, that is cached. + Feed() *feed.Feed } -func cacheForVersion(version Version) (CacheImpl, error) { +func cacheForVersion(version Version) (Impl, error) { switch version { case v1Version: return newV1Cache(), nil @@ -78,7 +85,7 @@ func lock(fileName string) (lock lockfile.Lockfile, err error) { } func (cache *Cache) store(fileName string) error { - if cache.CacheImpl == nil { + if cache.Impl == nil { return fmt.Errorf("trying to store nil cache") } if cache.Version() != currentVersion { @@ -97,7 +104,7 @@ func (cache *Cache) store(fileName string) error { } encoder := gob.NewEncoder(writer) - if err = encoder.Encode(cache.CacheImpl); err != nil { + if err = encoder.Encode(cache.Impl); err != nil { return fmt.Errorf("encoding cache: %w", err) } @@ -123,8 +130,8 @@ func newCache() (Cache, error) { return Cache{}, err } return Cache{ - CacheImpl: cache, - locked: false, + Impl: cache, + locked: false, }, nil } diff --git a/internal/feed/cache_v1.go b/internal/feed/cache/cache_v1.go index a80e81c..d754b00 100644 --- a/internal/feed/cache_v1.go +++ b/internal/feed/cache/cache_v1.go @@ -1,4 +1,4 @@ -package feed +package cache import ( "crypto/sha256" @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" + "github.com/Necoro/feed2imap-go/internal/feed" "github.com/Necoro/feed2imap-go/pkg/log" "github.com/Necoro/feed2imap-go/pkg/util" ) @@ -32,12 +33,13 @@ func idFromString(s string) feedId { } type v1Cache struct { - Ids map[feedDescriptor]feedId + Ids map[feed.Descriptor]feedId NextId uint64 Feeds map[feedId]*cachedFeed } type cachedFeed struct { + feed *feed.Feed id feedId // not saved, has to be set on loading LastCheck time.Time currentCheck time.Time @@ -100,8 +102,8 @@ func (cf *cachedFeed) Last() time.Time { return cf.LastCheck } -func (cf *cachedFeed) ID() string { - return cf.id.String() +func (cf *cachedFeed) Feed() *feed.Feed { + return cf.feed } func (cache *v1Cache) Version() Version { @@ -150,18 +152,18 @@ Num Items: %d func newV1Cache() *v1Cache { cache := v1Cache{ - Ids: map[feedDescriptor]feedId{}, + Ids: map[feed.Descriptor]feedId{}, Feeds: map[feedId]*cachedFeed{}, NextId: startFeedId, } return &cache } -func (cache *v1Cache) transformToCurrent() (CacheImpl, error) { +func (cache *v1Cache) transformToCurrent() (Impl, error) { return cache, nil } -func (cache *v1Cache) getItem(id feedId) CachedFeed { +func (cache *v1Cache) getItem(id feedId) *cachedFeed { feed, ok := cache.Feeds[id] if !ok { feed = &cachedFeed{} @@ -171,15 +173,11 @@ func (cache *v1Cache) getItem(id feedId) CachedFeed { return feed } -func (cache *v1Cache) findItem(feed *Feed) CachedFeed { - if feed.cached != nil { - return feed.cached.(*cachedFeed) - } - - fDescr := feed.descriptor() +func (cache *v1Cache) cachedFeed(f *feed.Feed) CachedFeed { + fDescr := f.Descriptor() id, ok := cache.Ids[fDescr] if !ok { - var otherId feedDescriptor + var otherId feed.Descriptor changed := false for otherId, id = range cache.Ids { if otherId.Name == fDescr.Name { @@ -204,15 +202,16 @@ func (cache *v1Cache) findItem(feed *Feed) CachedFeed { cache.Ids[fDescr] = id } - item := cache.getItem(id) - feed.cached = item - return item + cf := cache.getItem(id) + cf.feed = f + f.SetExtID(id) + return cf } -func (item *item) newCachedItem() cachedItem { +func newCachedItem(item *feed.Item) cachedItem { var ci cachedItem - ci.ID = item.itemId + ci.ID = item.ID ci.Title = item.Item.Title ci.Link = item.Item.Link if item.DateParsed() != nil { @@ -239,27 +238,27 @@ func (cf *cachedFeed) deleteItem(index int) { cf.Items = cf.Items[:len(cf.Items)-1] } -func (cf *cachedFeed) filterItems(items []item, ignoreHash, alwaysNew bool) []item { +func (cf *cachedFeed) Filter(items []feed.Item, ignoreHash, alwaysNew bool) []feed.Item { if len(items) == 0 { return items } - cacheItems := make(map[cachedItem]*item, len(items)) + cacheItems := make(map[cachedItem]*feed.Item, len(items)) for idx := range items { // remove complete duplicates on the go - cacheItems[items[idx].newCachedItem()] = &items[idx] + cacheItems[newCachedItem(&items[idx])] = &items[idx] } log.Debugf("%d items after deduplication", len(cacheItems)) - filtered := make([]item, 0, len(items)) + filtered := make([]feed.Item, 0, len(items)) cacheadd := make([]cachedItem, 0, len(items)) - app := func(item *item, ci cachedItem, oldIdx *int) { + app := func(item *feed.Item, ci cachedItem, oldIdx *int) { if oldIdx != nil { - item.updateOnly = true + item.UpdateOnly = true prevId := cf.Items[*oldIdx].ID ci.ID = prevId - item.itemId = prevId - log.Debugf("oldIdx: %d, prevId: %s, item.id: %s", *oldIdx, prevId, item.id()) + item.ID = prevId + log.Debugf("oldIdx: %d, prevId: %s, item.id: %s", *oldIdx, prevId, item.Id()) cf.deleteItem(*oldIdx) } filtered = append(filtered, *item) @@ -275,7 +274,7 @@ CACHE_ITEMS: if oldItem.Guid == ci.Guid { log.Debugf("Guid matches with: %s", oldItem) if !oldItem.similarTo(&ci, ignoreHash) { - item.addReason("guid (upd)") + item.AddReason("guid (upd)") app(item, ci, &idx) } else { log.Debugf("Similar, ignoring item %s", base64.RawURLEncoding.EncodeToString(oldItem.ID[:])) @@ -286,7 +285,7 @@ CACHE_ITEMS: } log.Debug("Found no matching GUID, including.") - item.addReason("guid") + item.AddReason("guid") app(item, ci, nil) continue } @@ -300,11 +299,11 @@ CACHE_ITEMS: if oldItem.Link == ci.Link { if alwaysNew { log.Debugf("Link matches, but `always-new`.") - item.addReason("always-new") + item.AddReason("always-new") continue } log.Debugf("Link matches, updating: %s", oldItem) - item.addReason("link (upd)") + item.AddReason("link (upd)") app(item, ci, &idx) continue CACHE_ITEMS @@ -312,7 +311,7 @@ CACHE_ITEMS: } log.Debugf("No match found, inserting.") - item.addReason("new") + item.AddReason("new") app(item, ci, nil) } diff --git a/internal/feed/cache/state.go b/internal/feed/cache/state.go new file mode 100644 index 0000000..c2e7de8 --- /dev/null +++ b/internal/feed/cache/state.go @@ -0,0 +1,146 @@ +package cache + +import ( + "sync" + + "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/pkg/config" + "github.com/Necoro/feed2imap-go/pkg/log" +) + +type State struct { + feeds map[string]*feed.Feed + cachedFeeds map[string]CachedFeed + cache Cache + cfg *config.Config +} + +func (state *State) Foreach(f func(CachedFeed)) { + for _, feed := range state.cachedFeeds { + f(feed) + } +} + +func (state *State) ForeachGo(goFunc func(CachedFeed)) { + var wg sync.WaitGroup + wg.Add(len(state.feeds)) + + f := func(feed CachedFeed, wg *sync.WaitGroup) { + goFunc(feed) + wg.Done() + } + + for _, feed := range state.cachedFeeds { + go f(feed, &wg) + } + wg.Wait() +} + +func (state *State) LoadCache(fileName string, forceNew bool) error { + var ( + cache Cache + err error + ) + + if forceNew { + cache, err = newCache() + } else { + cache, err = LoadCache(fileName) + } + + if err != nil { + return err + } + state.cache = cache + + for name, feed := range state.feeds { + state.cachedFeeds[name] = cache.cachedFeed(feed) + } + + // state.feeds should not be used after loading the cache --> enforce a panic + state.feeds = nil + + return nil +} + +func (state *State) StoreCache(fileName string) error { + return state.cache.store(fileName) +} + +func (state *State) UnlockCache() { + _ = state.cache.Unlock() +} + +func (state *State) Fetch() int { + state.ForeachGo(handleFeed) + + ctr := 0 + for _, cf := range state.cachedFeeds { + success := cf.Feed().FetchSuccessful() + cf.Checked(!success) + + if success { + ctr++ + } + } + + return ctr +} + +func handleFeed(cf CachedFeed) { + feed := cf.Feed() + log.Printf("Fetching %s from %s", feed.Name, feed.Url) + + err := feed.Parse() + if err != nil { + if feed.Url == "" || cf.Failures() >= feed.Global.MaxFailures { + log.Error(err) + } else { + log.Print(err) + } + } +} + +func filterFeed(cf CachedFeed) { + cf.Feed().Filter(cf.Filter) +} + +func (state *State) Filter() { + if log.IsDebug() { + // single threaded for better output + state.Foreach(filterFeed) + } else { + state.ForeachGo(filterFeed) + } +} + +func NewState(cfg *config.Config) (*State, error) { + state := State{ + feeds: map[string]*feed.Feed{}, + cachedFeeds: map[string]CachedFeed{}, + cache: Cache{}, // loaded later on + cfg: cfg, + } + + for name, parsedFeed := range cfg.Feeds { + feed, err := feed.Create(parsedFeed, cfg.GlobalOptions) + if err != nil { + return nil, err + } + state.feeds[name] = feed + } + + return &state, nil +} + +func (state *State) RemoveUndue() { + for name, feed := range state.cachedFeeds { + if feed.Feed().Disable || !feed.Feed().NeedsUpdate(feed.Last()) { + delete(state.cachedFeeds, name) + } + } +} + +func (state *State) NumFeeds() int { + return len(state.cachedFeeds) +} |