aboutsummaryrefslogtreecommitdiff
path: root/internal/feed/cache
diff options
context:
space:
mode:
Diffstat (limited to 'internal/feed/cache')
-rw-r--r--internal/feed/cache/cache.go179
-rw-r--r--internal/feed/cache/cache_v1.go323
-rw-r--r--internal/feed/cache/state.go146
3 files changed, 648 insertions, 0 deletions
diff --git a/internal/feed/cache/cache.go b/internal/feed/cache/cache.go
new file mode 100644
index 0000000..e7fe673
--- /dev/null
+++ b/internal/feed/cache/cache.go
@@ -0,0 +1,179 @@
+package cache
+
+import (
+ "bufio"
+ "encoding/gob"
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/nightlyone/lockfile"
+
+ "github.com/Necoro/feed2imap-go/internal/feed"
+ "github.com/Necoro/feed2imap-go/pkg/log"
+)
+
+type Version byte
+
+const (
+ currentVersion Version = 1
+)
+
+type Impl interface {
+ cachedFeed(*feed.Feed) CachedFeed
+ transformToCurrent() (Impl, error)
+ Version() Version
+ Info() string
+ SpecificInfo(interface{}) string
+}
+
+type Cache struct {
+ Impl
+ lock lockfile.Lockfile
+ locked bool
+}
+
+type CachedFeed interface {
+ // Checked marks the feed as being a failure or a success on last check.
+ Checked(withFailure bool)
+ // Failures of this feed up to now.
+ Failures() int
+ // The Last time, this feed has been checked
+ Last() time.Time
+ // Filter the given items against the cached items.
+ Filter(items []feed.Item, ignoreHash bool, alwaysNew bool) []feed.Item
+ // Commit any changes done to the cache state.
+ Commit()
+ // The Feed, that is cached.
+ Feed() *feed.Feed
+}
+
+func cacheForVersion(version Version) (Impl, error) {
+ switch version {
+ case v1Version:
+ return newV1Cache(), nil
+ default:
+ return nil, fmt.Errorf("unknown cache version '%d'", version)
+ }
+}
+
+func lockName(fileName string) (string, error) {
+ return filepath.Abs(fileName + ".lck")
+}
+
+func lock(fileName string) (lock lockfile.Lockfile, err error) {
+ var lockFile string
+
+ if lockFile, err = lockName(fileName); err != nil {
+ return
+ }
+ log.Debugf("Handling lock file '%s'", lockFile)
+
+ if lock, err = lockfile.New(lockFile); err != nil {
+ err = fmt.Errorf("Creating lock file: %w", err)
+ return
+ }
+
+ if err = lock.TryLock(); err != nil {
+ err = fmt.Errorf("Locking cache: %w", err)
+ return
+ }
+
+ return
+}
+
+func (cache *Cache) store(fileName string) error {
+ if cache.Impl == nil {
+ return fmt.Errorf("trying to store nil cache")
+ }
+ if cache.Version() != currentVersion {
+ return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion)
+ }
+
+ f, err := os.Create(fileName)
+ if err != nil {
+ return fmt.Errorf("trying to store cache to '%s': %w", fileName, err)
+ }
+ defer f.Close()
+
+ writer := bufio.NewWriter(f)
+ if err = writer.WriteByte(byte(currentVersion)); err != nil {
+ return fmt.Errorf("writing to '%s': %w", fileName, err)
+ }
+
+ encoder := gob.NewEncoder(writer)
+ if err = encoder.Encode(cache.Impl); err != nil {
+ return fmt.Errorf("encoding cache: %w", err)
+ }
+
+ writer.Flush()
+ log.Printf("Stored cache to '%s'.", fileName)
+
+ return cache.Unlock()
+}
+
+func (cache *Cache) Unlock() error {
+ if cache.locked {
+ if err := cache.lock.Unlock(); err != nil {
+ return fmt.Errorf("Unlocking cache: %w", err)
+ }
+ }
+ cache.locked = false
+ return nil
+}
+
+func newCache() (Cache, error) {
+ cache, err := cacheForVersion(currentVersion)
+ if err != nil {
+ return Cache{}, err
+ }
+ return Cache{
+ Impl: cache,
+ locked: false,
+ }, nil
+}
+
+func LoadCache(fileName string) (Cache, error) {
+ f, err := os.Open(fileName)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ // no cache there yet -- make new
+ return newCache()
+ }
+ return Cache{}, fmt.Errorf("opening cache at '%s': %w", fileName, err)
+ }
+ defer f.Close()
+
+ lock, err := lock(fileName)
+ if err != nil {
+ return Cache{}, err
+ }
+
+ log.Printf("Loading cache from '%s'", fileName)
+
+ reader := bufio.NewReader(f)
+ version, err := reader.ReadByte()
+ if err != nil {
+ return Cache{}, fmt.Errorf("reading from '%s': %w", fileName, err)
+ }
+
+ cache, err := cacheForVersion(Version(version))
+ if err != nil {
+ return Cache{}, err
+ }
+
+ decoder := gob.NewDecoder(reader)
+ if err = decoder.Decode(cache); err != nil {
+ return Cache{}, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err)
+ }
+
+ if cache, err = cache.transformToCurrent(); err != nil {
+ return Cache{}, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err)
+ }
+
+ log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion)
+
+ return Cache{cache, lock, true}, nil
+}
diff --git a/internal/feed/cache/cache_v1.go b/internal/feed/cache/cache_v1.go
new file mode 100644
index 0000000..d754b00
--- /dev/null
+++ b/internal/feed/cache/cache_v1.go
@@ -0,0 +1,323 @@
+package cache
+
+import (
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/hex"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+
+ "github.com/Necoro/feed2imap-go/internal/feed"
+ "github.com/Necoro/feed2imap-go/pkg/log"
+ "github.com/Necoro/feed2imap-go/pkg/util"
+)
+
+const (
+ v1Version Version = 1
+ startFeedId uint64 = 1
+)
+
+type feedId uint64
+
+func (id feedId) String() string {
+ return strconv.FormatUint(uint64(id), 16)
+}
+
+func idFromString(s string) feedId {
+ id, _ := strconv.ParseUint(s, 16, 64)
+ return feedId(id)
+}
+
+type v1Cache struct {
+ Ids map[feed.Descriptor]feedId
+ NextId uint64
+ Feeds map[feedId]*cachedFeed
+}
+
+type cachedFeed struct {
+ feed *feed.Feed
+ id feedId // not saved, has to be set on loading
+ LastCheck time.Time
+ currentCheck time.Time
+ NumFailures int // can't be named `Failures` b/c it'll collide with the interface
+ Items []cachedItem
+ newItems []cachedItem
+}
+
+type itemHash [sha256.Size]byte
+
+func (h itemHash) String() string {
+ return hex.EncodeToString(h[:])
+}
+
+type cachedItem struct {
+ Guid string
+ Title string
+ Link string
+ Date time.Time
+ UpdatedCache time.Time
+ Hash itemHash
+ ID uuid.UUID
+}
+
+func (item cachedItem) String() string {
+ return fmt.Sprintf(`{
+ ID: %s
+ Title: %q
+ Guid: %q
+ Link: %q
+ Date: %s
+ Hash: %s
+}`,
+ base64.RawURLEncoding.EncodeToString(item.ID[:]),
+ item.Title, item.Guid, item.Link, util.TimeFormat(item.Date), item.Hash)
+}
+
+func (cf *cachedFeed) Checked(withFailure bool) {
+ cf.currentCheck = time.Now()
+ if withFailure {
+ cf.NumFailures++
+ } else {
+ cf.NumFailures = 0
+ }
+}
+
+func (cf *cachedFeed) Commit() {
+ if cf.newItems != nil {
+ cf.Items = cf.newItems
+ cf.newItems = nil
+ }
+ cf.LastCheck = cf.currentCheck
+}
+
+func (cf *cachedFeed) Failures() int {
+ return cf.NumFailures
+}
+
+func (cf *cachedFeed) Last() time.Time {
+ return cf.LastCheck
+}
+
+func (cf *cachedFeed) Feed() *feed.Feed {
+ return cf.feed
+}
+
+func (cache *v1Cache) Version() Version {
+ return v1Version
+}
+
+func (cache *v1Cache) Info() string {
+ b := strings.Builder{}
+ for descr, id := range cache.Ids {
+ b.WriteString(fmt.Sprintf("%3s: %s (%s)\n", id.String(), descr.Name, descr.Url))
+ }
+ return b.String()
+}
+
+func (cache *v1Cache) SpecificInfo(i interface{}) string {
+ id := idFromString(i.(string))
+
+ b := strings.Builder{}
+ feed := cache.Feeds[id]
+
+ for descr, fId := range cache.Ids {
+ if id == fId {
+ b.WriteString(descr.Name)
+ b.WriteString(" -- ")
+ b.WriteString(descr.Url)
+ b.WriteByte('\n')
+ break
+ }
+ }
+
+ b.WriteString(fmt.Sprintf(`
+Last Check: %s
+Num Failures: %d
+Num Items: %d
+`,
+ util.TimeFormat(feed.LastCheck),
+ feed.NumFailures,
+ len(feed.Items)))
+
+ for _, item := range feed.Items {
+ b.WriteString("\n--------------------\n")
+ b.WriteString(item.String())
+ }
+ return b.String()
+}
+
+func newV1Cache() *v1Cache {
+ cache := v1Cache{
+ Ids: map[feed.Descriptor]feedId{},
+ Feeds: map[feedId]*cachedFeed{},
+ NextId: startFeedId,
+ }
+ return &cache
+}
+
+func (cache *v1Cache) transformToCurrent() (Impl, error) {
+ return cache, nil
+}
+
+func (cache *v1Cache) getItem(id feedId) *cachedFeed {
+ feed, ok := cache.Feeds[id]
+ if !ok {
+ feed = &cachedFeed{}
+ cache.Feeds[id] = feed
+ }
+ feed.id = id
+ return feed
+}
+
+func (cache *v1Cache) cachedFeed(f *feed.Feed) CachedFeed {
+ fDescr := f.Descriptor()
+ id, ok := cache.Ids[fDescr]
+ if !ok {
+ var otherId feed.Descriptor
+ changed := false
+ for otherId, id = range cache.Ids {
+ if otherId.Name == fDescr.Name {
+ log.Warnf("Feed %s seems to have changed URLs: new '%s', old '%s'. Updating.",
+ fDescr.Name, fDescr.Url, otherId.Url)
+ changed = true
+ break
+ } else if otherId.Url == fDescr.Url {
+ log.Warnf("Feed with URL '%s' seems to have changed its name: new '%s', old '%s'. Updating.",
+ fDescr.Url, fDescr.Name, otherId.Name)
+ changed = true
+ break
+ }
+ }
+ if changed {
+ delete(cache.Ids, otherId)
+ } else {
+ id = feedId(cache.NextId)
+ cache.NextId++
+ }
+
+ cache.Ids[fDescr] = id
+ }
+
+ cf := cache.getItem(id)
+ cf.feed = f
+ f.SetExtID(id)
+ return cf
+}
+
+func newCachedItem(item *feed.Item) cachedItem {
+ var ci cachedItem
+
+ ci.ID = item.ID
+ ci.Title = item.Item.Title
+ ci.Link = item.Item.Link
+ if item.DateParsed() != nil {
+ ci.Date = *item.DateParsed()
+ }
+ ci.Guid = item.Item.GUID
+
+ contentByte := []byte(item.Item.Description + item.Item.Content)
+ ci.Hash = sha256.Sum256(contentByte)
+
+ return ci
+}
+
+func (item *cachedItem) similarTo(other *cachedItem, ignoreHash bool) bool {
+ return other.Title == item.Title &&
+ other.Link == item.Link &&
+ other.Date.Equal(item.Date) &&
+ (ignoreHash || other.Hash == item.Hash)
+}
+
+func (cf *cachedFeed) deleteItem(index int) {
+ copy(cf.Items[index:], cf.Items[index+1:])
+ cf.Items[len(cf.Items)-1] = cachedItem{}
+ cf.Items = cf.Items[:len(cf.Items)-1]
+}
+
+func (cf *cachedFeed) Filter(items []feed.Item, ignoreHash, alwaysNew bool) []feed.Item {
+ if len(items) == 0 {
+ return items
+ }
+
+ cacheItems := make(map[cachedItem]*feed.Item, len(items))
+ for idx := range items {
+ // remove complete duplicates on the go
+ cacheItems[newCachedItem(&items[idx])] = &items[idx]
+ }
+ log.Debugf("%d items after deduplication", len(cacheItems))
+
+ filtered := make([]feed.Item, 0, len(items))
+ cacheadd := make([]cachedItem, 0, len(items))
+ app := func(item *feed.Item, ci cachedItem, oldIdx *int) {
+ if oldIdx != nil {
+ item.UpdateOnly = true
+ prevId := cf.Items[*oldIdx].ID
+ ci.ID = prevId
+ item.ID = prevId
+ log.Debugf("oldIdx: %d, prevId: %s, item.id: %s", *oldIdx, prevId, item.Id())
+ cf.deleteItem(*oldIdx)
+ }
+ filtered = append(filtered, *item)
+ cacheadd = append(cacheadd, ci)
+ }
+
+CACHE_ITEMS:
+ for ci, item := range cacheItems {
+ log.Debugf("Now checking %s", ci)
+
+ if ci.Guid != "" {
+ for idx, oldItem := range cf.Items {
+ if oldItem.Guid == ci.Guid {
+ log.Debugf("Guid matches with: %s", oldItem)
+ if !oldItem.similarTo(&ci, ignoreHash) {
+ item.AddReason("guid (upd)")
+ app(item, ci, &idx)
+ } else {
+ log.Debugf("Similar, ignoring item %s", base64.RawURLEncoding.EncodeToString(oldItem.ID[:]))
+ }
+
+ continue CACHE_ITEMS
+ }
+ }
+
+ log.Debug("Found no matching GUID, including.")
+ item.AddReason("guid")
+ app(item, ci, nil)
+ continue
+ }
+
+ for idx, oldItem := range cf.Items {
+ if oldItem.similarTo(&ci, ignoreHash) {
+ log.Debugf("Similarity matches, ignoring: %s", oldItem)
+ continue CACHE_ITEMS
+ }
+
+ if oldItem.Link == ci.Link {
+ if alwaysNew {
+ log.Debugf("Link matches, but `always-new`.")
+ item.AddReason("always-new")
+ continue
+ }
+ log.Debugf("Link matches, updating: %s", oldItem)
+ item.AddReason("link (upd)")
+ app(item, ci, &idx)
+
+ continue CACHE_ITEMS
+ }
+ }
+
+ log.Debugf("No match found, inserting.")
+ item.AddReason("new")
+ app(item, ci, nil)
+ }
+
+ log.Debugf("%d items after filtering", len(filtered))
+
+ cf.newItems = append(cacheadd, cf.Items...)
+
+ return filtered
+}
diff --git a/internal/feed/cache/state.go b/internal/feed/cache/state.go
new file mode 100644
index 0000000..c2e7de8
--- /dev/null
+++ b/internal/feed/cache/state.go
@@ -0,0 +1,146 @@
+package cache
+
+import (
+ "sync"
+
+ "github.com/Necoro/feed2imap-go/internal/feed"
+ "github.com/Necoro/feed2imap-go/pkg/config"
+ "github.com/Necoro/feed2imap-go/pkg/log"
+)
+
+type State struct {
+ feeds map[string]*feed.Feed
+ cachedFeeds map[string]CachedFeed
+ cache Cache
+ cfg *config.Config
+}
+
+func (state *State) Foreach(f func(CachedFeed)) {
+ for _, feed := range state.cachedFeeds {
+ f(feed)
+ }
+}
+
+func (state *State) ForeachGo(goFunc func(CachedFeed)) {
+ var wg sync.WaitGroup
+ wg.Add(len(state.feeds))
+
+ f := func(feed CachedFeed, wg *sync.WaitGroup) {
+ goFunc(feed)
+ wg.Done()
+ }
+
+ for _, feed := range state.cachedFeeds {
+ go f(feed, &wg)
+ }
+ wg.Wait()
+}
+
+func (state *State) LoadCache(fileName string, forceNew bool) error {
+ var (
+ cache Cache
+ err error
+ )
+
+ if forceNew {
+ cache, err = newCache()
+ } else {
+ cache, err = LoadCache(fileName)
+ }
+
+ if err != nil {
+ return err
+ }
+ state.cache = cache
+
+ for name, feed := range state.feeds {
+ state.cachedFeeds[name] = cache.cachedFeed(feed)
+ }
+
+ // state.feeds should not be used after loading the cache --> enforce a panic
+ state.feeds = nil
+
+ return nil
+}
+
+func (state *State) StoreCache(fileName string) error {
+ return state.cache.store(fileName)
+}
+
+func (state *State) UnlockCache() {
+ _ = state.cache.Unlock()
+}
+
+func (state *State) Fetch() int {
+ state.ForeachGo(handleFeed)
+
+ ctr := 0
+ for _, cf := range state.cachedFeeds {
+ success := cf.Feed().FetchSuccessful()
+ cf.Checked(!success)
+
+ if success {
+ ctr++
+ }
+ }
+
+ return ctr
+}
+
+func handleFeed(cf CachedFeed) {
+ feed := cf.Feed()
+ log.Printf("Fetching %s from %s", feed.Name, feed.Url)
+
+ err := feed.Parse()
+ if err != nil {
+ if feed.Url == "" || cf.Failures() >= feed.Global.MaxFailures {
+ log.Error(err)
+ } else {
+ log.Print(err)
+ }
+ }
+}
+
+func filterFeed(cf CachedFeed) {
+ cf.Feed().Filter(cf.Filter)
+}
+
+func (state *State) Filter() {
+ if log.IsDebug() {
+ // single threaded for better output
+ state.Foreach(filterFeed)
+ } else {
+ state.ForeachGo(filterFeed)
+ }
+}
+
+func NewState(cfg *config.Config) (*State, error) {
+ state := State{
+ feeds: map[string]*feed.Feed{},
+ cachedFeeds: map[string]CachedFeed{},
+ cache: Cache{}, // loaded later on
+ cfg: cfg,
+ }
+
+ for name, parsedFeed := range cfg.Feeds {
+ feed, err := feed.Create(parsedFeed, cfg.GlobalOptions)
+ if err != nil {
+ return nil, err
+ }
+ state.feeds[name] = feed
+ }
+
+ return &state, nil
+}
+
+func (state *State) RemoveUndue() {
+ for name, feed := range state.cachedFeeds {
+ if feed.Feed().Disable || !feed.Feed().NeedsUpdate(feed.Last()) {
+ delete(state.cachedFeeds, name)
+ }
+ }
+}
+
+func (state *State) NumFeeds() int {
+ return len(state.cachedFeeds)
+}