From 7106d5a6e7585dce5fdd552cca30063dd352dc23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Mon, 22 Feb 2021 22:54:43 +0100 Subject: Split cache and feed packages --- internal/feed/cache.go | 172 --------------------- internal/feed/cache/cache.go | 179 +++++++++++++++++++++ internal/feed/cache/cache_v1.go | 323 ++++++++++++++++++++++++++++++++++++++ internal/feed/cache/state.go | 146 ++++++++++++++++++ internal/feed/cache_v1.go | 324 --------------------------------------- internal/feed/feed.go | 86 +++++++++-- internal/feed/item.go | 53 ++++--- internal/feed/mail.go | 24 +-- internal/feed/parse.go | 20 +-- internal/feed/state.go | 184 ---------------------- internal/feed/template/html.tpl | 4 +- internal/feed/template/text.tpl | 2 +- main.go | 15 +- tools/print-cache/print-cache.go | 4 +- 14 files changed, 781 insertions(+), 755 deletions(-) delete mode 100644 internal/feed/cache.go create mode 100644 internal/feed/cache/cache.go create mode 100644 internal/feed/cache/cache_v1.go create mode 100644 internal/feed/cache/state.go delete mode 100644 internal/feed/cache_v1.go delete mode 100644 internal/feed/state.go diff --git a/internal/feed/cache.go b/internal/feed/cache.go deleted file mode 100644 index 5f5c394..0000000 --- a/internal/feed/cache.go +++ /dev/null @@ -1,172 +0,0 @@ -package feed - -import ( - "bufio" - "encoding/gob" - "errors" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/nightlyone/lockfile" - - "github.com/Necoro/feed2imap-go/pkg/log" -) - -type Version byte - -const ( - currentVersion Version = 1 -) - -type CacheImpl interface { - findItem(*Feed) CachedFeed - Version() Version - Info() string - SpecificInfo(interface{}) string - transformToCurrent() (CacheImpl, error) -} - -type Cache struct { - CacheImpl - lock lockfile.Lockfile - locked bool -} - -type CachedFeed interface { - Checked(withFailure bool) - Failures() int - Last() time.Time - ID() string - filterItems(items []item, ignoreHash bool, alwaysNew bool) []item - Commit() -} - -func cacheForVersion(version Version) (CacheImpl, error) { - switch version { - case v1Version: - return newV1Cache(), nil - default: - return nil, fmt.Errorf("unknown cache version '%d'", version) - } -} - -func lockName(fileName string) (string, error) { - return filepath.Abs(fileName + ".lck") -} - -func lock(fileName string) (lock lockfile.Lockfile, err error) { - var lockFile string - - if lockFile, err = lockName(fileName); err != nil { - return - } - log.Debugf("Handling lock file '%s'", lockFile) - - if lock, err = lockfile.New(lockFile); err != nil { - err = fmt.Errorf("Creating lock file: %w", err) - return - } - - if err = lock.TryLock(); err != nil { - err = fmt.Errorf("Locking cache: %w", err) - return - } - - return -} - -func (cache *Cache) store(fileName string) error { - if cache.CacheImpl == nil { - return fmt.Errorf("trying to store nil cache") - } - if cache.Version() != currentVersion { - return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion) - } - - f, err := os.Create(fileName) - if err != nil { - return fmt.Errorf("trying to store cache to '%s': %w", fileName, err) - } - defer f.Close() - - writer := bufio.NewWriter(f) - if err = writer.WriteByte(byte(currentVersion)); err != nil { - return fmt.Errorf("writing to '%s': %w", fileName, err) - } - - encoder := gob.NewEncoder(writer) - if err = encoder.Encode(cache.CacheImpl); err != nil { - return fmt.Errorf("encoding cache: %w", err) - } - - writer.Flush() - log.Printf("Stored cache to '%s'.", fileName) - - return cache.Unlock() -} - -func (cache *Cache) Unlock() error { - if cache.locked { - if err := cache.lock.Unlock(); err != nil { - return fmt.Errorf("Unlocking cache: %w", err) - } - } - cache.locked = false - return nil -} - -func newCache() (Cache, error) { - cache, err := cacheForVersion(currentVersion) - if err != nil { - return Cache{}, err - } - return Cache{ - CacheImpl: cache, - locked: false, - }, nil -} - -func LoadCache(fileName string) (Cache, error) { - f, err := os.Open(fileName) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - // no cache there yet -- make new - return newCache() - } - return Cache{}, fmt.Errorf("opening cache at '%s': %w", fileName, err) - } - defer f.Close() - - lock, err := lock(fileName) - if err != nil { - return Cache{}, err - } - - log.Printf("Loading cache from '%s'", fileName) - - reader := bufio.NewReader(f) - version, err := reader.ReadByte() - if err != nil { - return Cache{}, fmt.Errorf("reading from '%s': %w", fileName, err) - } - - cache, err := cacheForVersion(Version(version)) - if err != nil { - return Cache{}, err - } - - decoder := gob.NewDecoder(reader) - if err = decoder.Decode(cache); err != nil { - return Cache{}, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err) - } - - if cache, err = cache.transformToCurrent(); err != nil { - return Cache{}, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err) - } - - log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion) - - return Cache{cache, lock, true}, nil -} diff --git a/internal/feed/cache/cache.go b/internal/feed/cache/cache.go new file mode 100644 index 0000000..e7fe673 --- /dev/null +++ b/internal/feed/cache/cache.go @@ -0,0 +1,179 @@ +package cache + +import ( + "bufio" + "encoding/gob" + "errors" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/nightlyone/lockfile" + + "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/pkg/log" +) + +type Version byte + +const ( + currentVersion Version = 1 +) + +type Impl interface { + cachedFeed(*feed.Feed) CachedFeed + transformToCurrent() (Impl, error) + Version() Version + Info() string + SpecificInfo(interface{}) string +} + +type Cache struct { + Impl + lock lockfile.Lockfile + locked bool +} + +type CachedFeed interface { + // Checked marks the feed as being a failure or a success on last check. + Checked(withFailure bool) + // Failures of this feed up to now. + Failures() int + // The Last time, this feed has been checked + Last() time.Time + // Filter the given items against the cached items. + Filter(items []feed.Item, ignoreHash bool, alwaysNew bool) []feed.Item + // Commit any changes done to the cache state. + Commit() + // The Feed, that is cached. + Feed() *feed.Feed +} + +func cacheForVersion(version Version) (Impl, error) { + switch version { + case v1Version: + return newV1Cache(), nil + default: + return nil, fmt.Errorf("unknown cache version '%d'", version) + } +} + +func lockName(fileName string) (string, error) { + return filepath.Abs(fileName + ".lck") +} + +func lock(fileName string) (lock lockfile.Lockfile, err error) { + var lockFile string + + if lockFile, err = lockName(fileName); err != nil { + return + } + log.Debugf("Handling lock file '%s'", lockFile) + + if lock, err = lockfile.New(lockFile); err != nil { + err = fmt.Errorf("Creating lock file: %w", err) + return + } + + if err = lock.TryLock(); err != nil { + err = fmt.Errorf("Locking cache: %w", err) + return + } + + return +} + +func (cache *Cache) store(fileName string) error { + if cache.Impl == nil { + return fmt.Errorf("trying to store nil cache") + } + if cache.Version() != currentVersion { + return fmt.Errorf("trying to store cache with unsupported version '%d' (current: '%d')", cache.Version(), currentVersion) + } + + f, err := os.Create(fileName) + if err != nil { + return fmt.Errorf("trying to store cache to '%s': %w", fileName, err) + } + defer f.Close() + + writer := bufio.NewWriter(f) + if err = writer.WriteByte(byte(currentVersion)); err != nil { + return fmt.Errorf("writing to '%s': %w", fileName, err) + } + + encoder := gob.NewEncoder(writer) + if err = encoder.Encode(cache.Impl); err != nil { + return fmt.Errorf("encoding cache: %w", err) + } + + writer.Flush() + log.Printf("Stored cache to '%s'.", fileName) + + return cache.Unlock() +} + +func (cache *Cache) Unlock() error { + if cache.locked { + if err := cache.lock.Unlock(); err != nil { + return fmt.Errorf("Unlocking cache: %w", err) + } + } + cache.locked = false + return nil +} + +func newCache() (Cache, error) { + cache, err := cacheForVersion(currentVersion) + if err != nil { + return Cache{}, err + } + return Cache{ + Impl: cache, + locked: false, + }, nil +} + +func LoadCache(fileName string) (Cache, error) { + f, err := os.Open(fileName) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // no cache there yet -- make new + return newCache() + } + return Cache{}, fmt.Errorf("opening cache at '%s': %w", fileName, err) + } + defer f.Close() + + lock, err := lock(fileName) + if err != nil { + return Cache{}, err + } + + log.Printf("Loading cache from '%s'", fileName) + + reader := bufio.NewReader(f) + version, err := reader.ReadByte() + if err != nil { + return Cache{}, fmt.Errorf("reading from '%s': %w", fileName, err) + } + + cache, err := cacheForVersion(Version(version)) + if err != nil { + return Cache{}, err + } + + decoder := gob.NewDecoder(reader) + if err = decoder.Decode(cache); err != nil { + return Cache{}, fmt.Errorf("decoding for version '%d' from '%s': %w", version, fileName, err) + } + + if cache, err = cache.transformToCurrent(); err != nil { + return Cache{}, fmt.Errorf("cannot transform from version %d to %d: %w", version, currentVersion, err) + } + + log.Printf("Loaded cache (version %d), transformed to version %d.", version, currentVersion) + + return Cache{cache, lock, true}, nil +} diff --git a/internal/feed/cache/cache_v1.go b/internal/feed/cache/cache_v1.go new file mode 100644 index 0000000..d754b00 --- /dev/null +++ b/internal/feed/cache/cache_v1.go @@ -0,0 +1,323 @@ +package cache + +import ( + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "fmt" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + + "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/pkg/log" + "github.com/Necoro/feed2imap-go/pkg/util" +) + +const ( + v1Version Version = 1 + startFeedId uint64 = 1 +) + +type feedId uint64 + +func (id feedId) String() string { + return strconv.FormatUint(uint64(id), 16) +} + +func idFromString(s string) feedId { + id, _ := strconv.ParseUint(s, 16, 64) + return feedId(id) +} + +type v1Cache struct { + Ids map[feed.Descriptor]feedId + NextId uint64 + Feeds map[feedId]*cachedFeed +} + +type cachedFeed struct { + feed *feed.Feed + id feedId // not saved, has to be set on loading + LastCheck time.Time + currentCheck time.Time + NumFailures int // can't be named `Failures` b/c it'll collide with the interface + Items []cachedItem + newItems []cachedItem +} + +type itemHash [sha256.Size]byte + +func (h itemHash) String() string { + return hex.EncodeToString(h[:]) +} + +type cachedItem struct { + Guid string + Title string + Link string + Date time.Time + UpdatedCache time.Time + Hash itemHash + ID uuid.UUID +} + +func (item cachedItem) String() string { + return fmt.Sprintf(`{ + ID: %s + Title: %q + Guid: %q + Link: %q + Date: %s + Hash: %s +}`, + base64.RawURLEncoding.EncodeToString(item.ID[:]), + item.Title, item.Guid, item.Link, util.TimeFormat(item.Date), item.Hash) +} + +func (cf *cachedFeed) Checked(withFailure bool) { + cf.currentCheck = time.Now() + if withFailure { + cf.NumFailures++ + } else { + cf.NumFailures = 0 + } +} + +func (cf *cachedFeed) Commit() { + if cf.newItems != nil { + cf.Items = cf.newItems + cf.newItems = nil + } + cf.LastCheck = cf.currentCheck +} + +func (cf *cachedFeed) Failures() int { + return cf.NumFailures +} + +func (cf *cachedFeed) Last() time.Time { + return cf.LastCheck +} + +func (cf *cachedFeed) Feed() *feed.Feed { + return cf.feed +} + +func (cache *v1Cache) Version() Version { + return v1Version +} + +func (cache *v1Cache) Info() string { + b := strings.Builder{} + for descr, id := range cache.Ids { + b.WriteString(fmt.Sprintf("%3s: %s (%s)\n", id.String(), descr.Name, descr.Url)) + } + return b.String() +} + +func (cache *v1Cache) SpecificInfo(i interface{}) string { + id := idFromString(i.(string)) + + b := strings.Builder{} + feed := cache.Feeds[id] + + for descr, fId := range cache.Ids { + if id == fId { + b.WriteString(descr.Name) + b.WriteString(" -- ") + b.WriteString(descr.Url) + b.WriteByte('\n') + break + } + } + + b.WriteString(fmt.Sprintf(` +Last Check: %s +Num Failures: %d +Num Items: %d +`, + util.TimeFormat(feed.LastCheck), + feed.NumFailures, + len(feed.Items))) + + for _, item := range feed.Items { + b.WriteString("\n--------------------\n") + b.WriteString(item.String()) + } + return b.String() +} + +func newV1Cache() *v1Cache { + cache := v1Cache{ + Ids: map[feed.Descriptor]feedId{}, + Feeds: map[feedId]*cachedFeed{}, + NextId: startFeedId, + } + return &cache +} + +func (cache *v1Cache) transformToCurrent() (Impl, error) { + return cache, nil +} + +func (cache *v1Cache) getItem(id feedId) *cachedFeed { + feed, ok := cache.Feeds[id] + if !ok { + feed = &cachedFeed{} + cache.Feeds[id] = feed + } + feed.id = id + return feed +} + +func (cache *v1Cache) cachedFeed(f *feed.Feed) CachedFeed { + fDescr := f.Descriptor() + id, ok := cache.Ids[fDescr] + if !ok { + var otherId feed.Descriptor + changed := false + for otherId, id = range cache.Ids { + if otherId.Name == fDescr.Name { + log.Warnf("Feed %s seems to have changed URLs: new '%s', old '%s'. Updating.", + fDescr.Name, fDescr.Url, otherId.Url) + changed = true + break + } else if otherId.Url == fDescr.Url { + log.Warnf("Feed with URL '%s' seems to have changed its name: new '%s', old '%s'. Updating.", + fDescr.Url, fDescr.Name, otherId.Name) + changed = true + break + } + } + if changed { + delete(cache.Ids, otherId) + } else { + id = feedId(cache.NextId) + cache.NextId++ + } + + cache.Ids[fDescr] = id + } + + cf := cache.getItem(id) + cf.feed = f + f.SetExtID(id) + return cf +} + +func newCachedItem(item *feed.Item) cachedItem { + var ci cachedItem + + ci.ID = item.ID + ci.Title = item.Item.Title + ci.Link = item.Item.Link + if item.DateParsed() != nil { + ci.Date = *item.DateParsed() + } + ci.Guid = item.Item.GUID + + contentByte := []byte(item.Item.Description + item.Item.Content) + ci.Hash = sha256.Sum256(contentByte) + + return ci +} + +func (item *cachedItem) similarTo(other *cachedItem, ignoreHash bool) bool { + return other.Title == item.Title && + other.Link == item.Link && + other.Date.Equal(item.Date) && + (ignoreHash || other.Hash == item.Hash) +} + +func (cf *cachedFeed) deleteItem(index int) { + copy(cf.Items[index:], cf.Items[index+1:]) + cf.Items[len(cf.Items)-1] = cachedItem{} + cf.Items = cf.Items[:len(cf.Items)-1] +} + +func (cf *cachedFeed) Filter(items []feed.Item, ignoreHash, alwaysNew bool) []feed.Item { + if len(items) == 0 { + return items + } + + cacheItems := make(map[cachedItem]*feed.Item, len(items)) + for idx := range items { + // remove complete duplicates on the go + cacheItems[newCachedItem(&items[idx])] = &items[idx] + } + log.Debugf("%d items after deduplication", len(cacheItems)) + + filtered := make([]feed.Item, 0, len(items)) + cacheadd := make([]cachedItem, 0, len(items)) + app := func(item *feed.Item, ci cachedItem, oldIdx *int) { + if oldIdx != nil { + item.UpdateOnly = true + prevId := cf.Items[*oldIdx].ID + ci.ID = prevId + item.ID = prevId + log.Debugf("oldIdx: %d, prevId: %s, item.id: %s", *oldIdx, prevId, item.Id()) + cf.deleteItem(*oldIdx) + } + filtered = append(filtered, *item) + cacheadd = append(cacheadd, ci) + } + +CACHE_ITEMS: + for ci, item := range cacheItems { + log.Debugf("Now checking %s", ci) + + if ci.Guid != "" { + for idx, oldItem := range cf.Items { + if oldItem.Guid == ci.Guid { + log.Debugf("Guid matches with: %s", oldItem) + if !oldItem.similarTo(&ci, ignoreHash) { + item.AddReason("guid (upd)") + app(item, ci, &idx) + } else { + log.Debugf("Similar, ignoring item %s", base64.RawURLEncoding.EncodeToString(oldItem.ID[:])) + } + + continue CACHE_ITEMS + } + } + + log.Debug("Found no matching GUID, including.") + item.AddReason("guid") + app(item, ci, nil) + continue + } + + for idx, oldItem := range cf.Items { + if oldItem.similarTo(&ci, ignoreHash) { + log.Debugf("Similarity matches, ignoring: %s", oldItem) + continue CACHE_ITEMS + } + + if oldItem.Link == ci.Link { + if alwaysNew { + log.Debugf("Link matches, but `always-new`.") + item.AddReason("always-new") + continue + } + log.Debugf("Link matches, updating: %s", oldItem) + item.AddReason("link (upd)") + app(item, ci, &idx) + + continue CACHE_ITEMS + } + } + + log.Debugf("No match found, inserting.") + item.AddReason("new") + app(item, ci, nil) + } + + log.Debugf("%d items after filtering", len(filtered)) + + cf.newItems = append(cacheadd, cf.Items...) + + return filtered +} diff --git a/internal/feed/cache/state.go b/internal/feed/cache/state.go new file mode 100644 index 0000000..c2e7de8 --- /dev/null +++ b/internal/feed/cache/state.go @@ -0,0 +1,146 @@ +package cache + +import ( + "sync" + + "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/pkg/config" + "github.com/Necoro/feed2imap-go/pkg/log" +) + +type State struct { + feeds map[string]*feed.Feed + cachedFeeds map[string]CachedFeed + cache Cache + cfg *config.Config +} + +func (state *State) Foreach(f func(CachedFeed)) { + for _, feed := range state.cachedFeeds { + f(feed) + } +} + +func (state *State) ForeachGo(goFunc func(CachedFeed)) { + var wg sync.WaitGroup + wg.Add(len(state.feeds)) + + f := func(feed CachedFeed, wg *sync.WaitGroup) { + goFunc(feed) + wg.Done() + } + + for _, feed := range state.cachedFeeds { + go f(feed, &wg) + } + wg.Wait() +} + +func (state *State) LoadCache(fileName string, forceNew bool) error { + var ( + cache Cache + err error + ) + + if forceNew { + cache, err = newCache() + } else { + cache, err = LoadCache(fileName) + } + + if err != nil { + return err + } + state.cache = cache + + for name, feed := range state.feeds { + state.cachedFeeds[name] = cache.cachedFeed(feed) + } + + // state.feeds should not be used after loading the cache --> enforce a panic + state.feeds = nil + + return nil +} + +func (state *State) StoreCache(fileName string) error { + return state.cache.store(fileName) +} + +func (state *State) UnlockCache() { + _ = state.cache.Unlock() +} + +func (state *State) Fetch() int { + state.ForeachGo(handleFeed) + + ctr := 0 + for _, cf := range state.cachedFeeds { + success := cf.Feed().FetchSuccessful() + cf.Checked(!success) + + if success { + ctr++ + } + } + + return ctr +} + +func handleFeed(cf CachedFeed) { + feed := cf.Feed() + log.Printf("Fetching %s from %s", feed.Name, feed.Url) + + err := feed.Parse() + if err != nil { + if feed.Url == "" || cf.Failures() >= feed.Global.MaxFailures { + log.Error(err) + } else { + log.Print(err) + } + } +} + +func filterFeed(cf CachedFeed) { + cf.Feed().Filter(cf.Filter) +} + +func (state *State) Filter() { + if log.IsDebug() { + // single threaded for better output + state.Foreach(filterFeed) + } else { + state.ForeachGo(filterFeed) + } +} + +func NewState(cfg *config.Config) (*State, error) { + state := State{ + feeds: map[string]*feed.Feed{}, + cachedFeeds: map[string]CachedFeed{}, + cache: Cache{}, // loaded later on + cfg: cfg, + } + + for name, parsedFeed := range cfg.Feeds { + feed, err := feed.Create(parsedFeed, cfg.GlobalOptions) + if err != nil { + return nil, err + } + state.feeds[name] = feed + } + + return &state, nil +} + +func (state *State) RemoveUndue() { + for name, feed := range state.cachedFeeds { + if feed.Feed().Disable || !feed.Feed().NeedsUpdate(feed.Last()) { + delete(state.cachedFeeds, name) + } + } +} + +func (state *State) NumFeeds() int { + return len(state.cachedFeeds) +} diff --git a/internal/feed/cache_v1.go b/internal/feed/cache_v1.go deleted file mode 100644 index a80e81c..0000000 --- a/internal/feed/cache_v1.go +++ /dev/null @@ -1,324 +0,0 @@ -package feed - -import ( - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "fmt" - "strconv" - "strings" - "time" - - "github.com/google/uuid" - - "github.com/Necoro/feed2imap-go/pkg/log" - "github.com/Necoro/feed2imap-go/pkg/util" -) - -const ( - v1Version Version = 1 - startFeedId uint64 = 1 -) - -type feedId uint64 - -func (id feedId) String() string { - return strconv.FormatUint(uint64(id), 16) -} - -func idFromString(s string) feedId { - id, _ := strconv.ParseUint(s, 16, 64) - return feedId(id) -} - -type v1Cache struct { - Ids map[feedDescriptor]feedId - NextId uint64 - Feeds map[feedId]*cachedFeed -} - -type cachedFeed struct { - id feedId // not saved, has to be set on loading - LastCheck time.Time - currentCheck time.Time - NumFailures int // can't be named `Failures` b/c it'll collide with the interface - Items []cachedItem - newItems []cachedItem -} - -type itemHash [sha256.Size]byte - -func (h itemHash) String() string { - return hex.EncodeToString(h[:]) -} - -type cachedItem struct { - Guid string - Title string - Link string - Date time.Time - UpdatedCache time.Time - Hash itemHash - ID uuid.UUID -} - -func (item cachedItem) String() string { - return fmt.Sprintf(`{ - ID: %s - Title: %q - Guid: %q - Link: %q - Date: %s - Hash: %s -}`, - base64.RawURLEncoding.EncodeToString(item.ID[:]), - item.Title, item.Guid, item.Link, util.TimeFormat(item.Date), item.Hash) -} - -func (cf *cachedFeed) Checked(withFailure bool) { - cf.currentCheck = time.Now() - if withFailure { - cf.NumFailures++ - } else { - cf.NumFailures = 0 - } -} - -func (cf *cachedFeed) Commit() { - if cf.newItems != nil { - cf.Items = cf.newItems - cf.newItems = nil - } - cf.LastCheck = cf.currentCheck -} - -func (cf *cachedFeed) Failures() int { - return cf.NumFailures -} - -func (cf *cachedFeed) Last() time.Time { - return cf.LastCheck -} - -func (cf *cachedFeed) ID() string { - return cf.id.String() -} - -func (cache *v1Cache) Version() Version { - return v1Version -} - -func (cache *v1Cache) Info() string { - b := strings.Builder{} - for descr, id := range cache.Ids { - b.WriteString(fmt.Sprintf("%3s: %s (%s)\n", id.String(), descr.Name, descr.Url)) - } - return b.String() -} - -func (cache *v1Cache) SpecificInfo(i interface{}) string { - id := idFromString(i.(string)) - - b := strings.Builder{} - feed := cache.Feeds[id] - - for descr, fId := range cache.Ids { - if id == fId { - b.WriteString(descr.Name) - b.WriteString(" -- ") - b.WriteString(descr.Url) - b.WriteByte('\n') - break - } - } - - b.WriteString(fmt.Sprintf(` -Last Check: %s -Num Failures: %d -Num Items: %d -`, - util.TimeFormat(feed.LastCheck), - feed.NumFailures, - len(feed.Items))) - - for _, item := range feed.Items { - b.WriteString("\n--------------------\n") - b.WriteString(item.String()) - } - return b.String() -} - -func newV1Cache() *v1Cache { - cache := v1Cache{ - Ids: map[feedDescriptor]feedId{}, - Feeds: map[feedId]*cachedFeed{}, - NextId: startFeedId, - } - return &cache -} - -func (cache *v1Cache) transformToCurrent() (CacheImpl, error) { - return cache, nil -} - -func (cache *v1Cache) getItem(id feedId) CachedFeed { - feed, ok := cache.Feeds[id] - if !ok { - feed = &cachedFeed{} - cache.Feeds[id] = feed - } - feed.id = id - return feed -} - -func (cache *v1Cache) findItem(feed *Feed) CachedFeed { - if feed.cached != nil { - return feed.cached.(*cachedFeed) - } - - fDescr := feed.descriptor() - id, ok := cache.Ids[fDescr] - if !ok { - var otherId feedDescriptor - changed := false - for otherId, id = range cache.Ids { - if otherId.Name == fDescr.Name { - log.Warnf("Feed %s seems to have changed URLs: new '%s', old '%s'. Updating.", - fDescr.Name, fDescr.Url, otherId.Url) - changed = true - break - } else if otherId.Url == fDescr.Url { - log.Warnf("Feed with URL '%s' seems to have changed its name: new '%s', old '%s'. Updating.", - fDescr.Url, fDescr.Name, otherId.Name) - changed = true - break - } - } - if changed { - delete(cache.Ids, otherId) - } else { - id = feedId(cache.NextId) - cache.NextId++ - } - - cache.Ids[fDescr] = id - } - - item := cache.getItem(id) - feed.cached = item - return item -} - -func (item *item) newCachedItem() cachedItem { - var ci cachedItem - - ci.ID = item.itemId - ci.Title = item.Item.Title - ci.Link = item.Item.Link - if item.DateParsed() != nil { - ci.Date = *item.DateParsed() - } - ci.Guid = item.Item.GUID - - contentByte := []byte(item.Item.Description + item.Item.Content) - ci.Hash = sha256.Sum256(contentByte) - - return ci -} - -func (item *cachedItem) similarTo(other *cachedItem, ignoreHash bool) bool { - return other.Title == item.Title && - other.Link == item.Link && - other.Date.Equal(item.Date) && - (ignoreHash || other.Hash == item.Hash) -} - -func (cf *cachedFeed) deleteItem(index int) { - copy(cf.Items[index:], cf.Items[index+1:]) - cf.Items[len(cf.Items)-1] = cachedItem{} - cf.Items = cf.Items[:len(cf.Items)-1] -} - -func (cf *cachedFeed) filterItems(items []item, ignoreHash, alwaysNew bool) []item { - if len(items) == 0 { - return items - } - - cacheItems := make(map[cachedItem]*item, len(items)) - for idx := range items { - // remove complete duplicates on the go - cacheItems[items[idx].newCachedItem()] = &items[idx] - } - log.Debugf("%d items after deduplication", len(cacheItems)) - - filtered := make([]item, 0, len(items)) - cacheadd := make([]cachedItem, 0, len(items)) - app := func(item *item, ci cachedItem, oldIdx *int) { - if oldIdx != nil { - item.updateOnly = true - prevId := cf.Items[*oldIdx].ID - ci.ID = prevId - item.itemId = prevId - log.Debugf("oldIdx: %d, prevId: %s, item.id: %s", *oldIdx, prevId, item.id()) - cf.deleteItem(*oldIdx) - } - filtered = append(filtered, *item) - cacheadd = append(cacheadd, ci) - } - -CACHE_ITEMS: - for ci, item := range cacheItems { - log.Debugf("Now checking %s", ci) - - if ci.Guid != "" { - for idx, oldItem := range cf.Items { - if oldItem.Guid == ci.Guid { - log.Debugf("Guid matches with: %s", oldItem) - if !oldItem.similarTo(&ci, ignoreHash) { - item.addReason("guid (upd)") - app(item, ci, &idx) - } else { - log.Debugf("Similar, ignoring item %s", base64.RawURLEncoding.EncodeToString(oldItem.ID[:])) - } - - continue CACHE_ITEMS - } - } - - log.Debug("Found no matching GUID, including.") - item.addReason("guid") - app(item, ci, nil) - continue - } - - for idx, oldItem := range cf.Items { - if oldItem.similarTo(&ci, ignoreHash) { - log.Debugf("Similarity matches, ignoring: %s", oldItem) - continue CACHE_ITEMS - } - - if oldItem.Link == ci.Link { - if alwaysNew { - log.Debugf("Link matches, but `always-new`.") - item.addReason("always-new") - continue - } - log.Debugf("Link matches, updating: %s", oldItem) - item.addReason("link (upd)") - app(item, ci, &idx) - - continue CACHE_ITEMS - } - } - - log.Debugf("No match found, inserting.") - item.addReason("new") - app(item, ci, nil) - } - - log.Debugf("%d items after filtering", len(filtered)) - - cf.newItems = append(cacheadd, cf.Items...) - - return filtered -} diff --git a/internal/feed/feed.go b/internal/feed/feed.go index de28ef8..431eae3 100644 --- a/internal/feed/feed.go +++ b/internal/feed/feed.go @@ -16,24 +16,30 @@ type Feed struct { *config.Feed feed *gofeed.Feed filter *filter.Filter - items []item - cached CachedFeed + items []Item Global config.GlobalOptions + extID FeedID } -type feedDescriptor struct { +type FeedID interface { + String() string +} + +type FilterFunc func(items []Item, ignHash, alwaysNew bool) []Item + +type Descriptor struct { Name string Url string } -func (feed *Feed) descriptor() feedDescriptor { +func (feed *Feed) Descriptor() Descriptor { var url string if feed.Url != "" { url = feed.Url } else { url = "exec://" + strings.Join(feed.Exec, "/") } - return feedDescriptor{ + return Descriptor{ Name: feed.Name, Url: url, } @@ -54,12 +60,6 @@ func (feed *Feed) FetchSuccessful() bool { return feed.feed != nil } -func (feed *Feed) MarkSuccess() { - if feed.cached != nil { - feed.cached.Commit() - } -} - func Create(parsedFeed *config.Feed, global config.GlobalOptions) (*Feed, error) { var itemFilter *filter.Filter var err error @@ -70,3 +70,67 @@ func Create(parsedFeed *config.Feed, global config.GlobalOptions) (*Feed, error) } return &Feed{Feed: parsedFeed, Global: global, filter: itemFilter}, nil } + +func (feed *Feed) filterItems() []Item { + if feed.filter == nil { + return feed.items + } + + items := make([]Item, 0, len(feed.items)) + + for _, item := range feed.items { + res, err := feed.filter.Run(item.Item) + if err != nil { + log.Errorf("Feed %s: Item %s: Error applying item filter: %s", feed.Name, printItem(item.Item), err) + res = true // include + } + + if res { + if log.IsDebug() { + log.Debugf("Filter '%s' matches for item %s", feed.ItemFilter, printItem(item.Item)) + } + items = append(items, item) + } else if log.IsDebug() { // printItem is not for free + log.Debugf("Filter '%s' does not match for item %s", feed.ItemFilter, printItem(item.Item)) + } + } + return items +} + +func (feed *Feed) Filter(filter FilterFunc) { + if len(feed.items) > 0 { + origLen := len(feed.items) + + log.Debugf("Filtering %s. Starting with %d items", feed.Name, origLen) + + items := feed.filterItems() + newLen := len(items) + if newLen < origLen { + log.Printf("Item filter on %s: Reduced from %d to %d items.", feed.Name, origLen, newLen) + origLen = newLen + } + + feed.items = filter(items, feed.IgnHash, feed.AlwaysNew) + + newLen = len(feed.items) + if newLen < origLen { + log.Printf("Filtered %s. Reduced from %d to %d items.", feed.Name, origLen, newLen) + } else { + log.Printf("Filtered %s, no reduction.", feed.Name) + } + + } else { + log.Debugf("No items for %s. No filtering.", feed.Name) + } +} + +func (feed *Feed) SetExtID(extID FeedID) { + feed.extID = extID +} + +func (feed *Feed) id() string { + if feed.extID == nil { + return feed.Name + } + return feed.extID.String() +} diff --git a/internal/feed/item.go b/internal/feed/item.go index 3cb9089..39f41ba 100644 --- a/internal/feed/item.go +++ b/internal/feed/item.go @@ -2,6 +2,7 @@ package feed import ( "encoding/base64" + "encoding/json" "fmt" "time" @@ -17,26 +18,26 @@ type feedImage struct { mime string } -type item struct { - *gofeed.Item - Feed *gofeed.Feed - feed *Feed - Body string - TextBody string - updateOnly bool - reasons []string - images []feedImage - itemId uuid.UUID +type Item struct { + *gofeed.Item // access fields implicitly + Feed *gofeed.Feed // named explicitly to not shadow common fields with Item + feed *Feed + Body string + TextBody string + UpdateOnly bool + reasons []string + images []feedImage + ID uuid.UUID } -func (item *item) DateParsed() *time.Time { +func (item *Item) DateParsed() *time.Time { if item.UpdatedParsed == nil || item.UpdatedParsed.IsZero() { return item.PublishedParsed } return item.UpdatedParsed } -func (item *item) Date() string { +func (item *Item) Date() string { if item.Updated == "" { return item.Published } @@ -44,14 +45,14 @@ func (item *item) Date() string { } // Creator returns the name of the creating author. -func (item *item) Creator() string { +func (item *Item) Creator() string { if item.Author != nil { return item.Author.Name } return "" } -func (item *item) FeedLink() string { +func (item *Item) FeedLink() string { if item.Feed.FeedLink != "" { // the one in the feed itself return item.Feed.FeedLink @@ -60,31 +61,37 @@ func (item *item) FeedLink() string { return item.feed.Url } -func (item *item) addReason(reason string) { +func (item *Item) AddReason(reason string) { if !util.StrContains(item.reasons, reason) { item.reasons = append(item.reasons, reason) } } -func (item *item) addImage(img []byte, mime string) int { +func (item *Item) addImage(img []byte, mime string) int { i := feedImage{img, mime} item.images = append(item.images, i) return len(item.images) } -func (item *item) clearImages() { +func (item *Item) clearImages() { item.images = []feedImage{} } -func (item *item) defaultEmail() string { +func (item *Item) defaultEmail() string { return item.feed.Global.DefaultEmail } -func (item *item) id() string { - idStr := base64.RawURLEncoding.EncodeToString(item.itemId[:]) - return item.feed.cached.ID() + "#" + idStr +func (item *Item) Id() string { + idStr := base64.RawURLEncoding.EncodeToString(item.ID[:]) + return item.feed.id() + "#" + idStr } -func (item *item) messageId() string { - return fmt.Sprintf("", item.id(), config.Hostname()) +func (item *Item) messageId() string { + return fmt.Sprintf("", item.Id(), config.Hostname()) +} + +func printItem(item *gofeed.Item) string { + // analogous to gofeed.Feed.String + json, _ := json.MarshalIndent(item, "", " ") + return string(json) } diff --git a/internal/feed/mail.go b/internal/feed/mail.go index b201d61..636f6a0 100644 --- a/internal/feed/mail.go +++ b/internal/feed/mail.go @@ -31,7 +31,7 @@ func address(name, address string) []*mail.Address { return []*mail.Address{{Name: name, Address: address}} } -func (item *item) fromAddress() []*mail.Address { +func (item *Item) fromAddress() []*mail.Address { switch { case item.Author != nil && item.Author.Email != "": return address(item.Author.Name, item.Author.Email) @@ -46,11 +46,11 @@ func (item *item) fromAddress() []*mail.Address { } } -func (item *item) toAddress() []*mail.Address { +func (item *Item) toAddress() []*mail.Address { return address(item.feed.Name, item.defaultEmail()) } -func (item *item) buildHeader() message.Header { +func (item *Item) buildHeader() message.Header { var h mail.Header h.SetContentType("multipart/alternative", nil) h.SetAddressList("From", item.fromAddress()) @@ -58,7 +58,7 @@ func (item *item) buildHeader() message.Header { h.Set("Message-Id", item.messageId()) h.Set(msg.VersionHeader, version.Version()) h.Set(msg.ReasonHeader, strings.Join(item.reasons, ",")) - h.Set(msg.IdHeader, item.id()) + h.Set(msg.IdHeader, item.Id()) h.Set(msg.CreateHeader, time.Now().Format(time.RFC1123Z)) if item.GUID != "" { h.Set(msg.GuidHeader, item.GUID) @@ -86,7 +86,7 @@ func (item *item) buildHeader() message.Header { return h.Header } -func (item *item) writeContentPart(w *message.Writer, typ string, tpl template.Template) error { +func (item *Item) writeContentPart(w *message.Writer, typ string, tpl template.Template) error { var ih message.Header ih.SetContentType("text/"+typ, map[string]string{"charset": "utf-8"}) ih.SetContentDisposition("inline", nil) @@ -105,11 +105,11 @@ func (item *item) writeContentPart(w *message.Writer, typ string, tpl template.T return nil } -func (item *item) writeTextPart(w *message.Writer) error { +func (item *Item) writeTextPart(w *message.Writer) error { return item.writeContentPart(w, "plain", template.Text) } -func (item *item) writeHtmlPart(w *message.Writer) error { +func (item *Item) writeHtmlPart(w *message.Writer) error { return item.writeContentPart(w, "html", template.Html) } @@ -133,7 +133,7 @@ func (img *feedImage) writeImagePart(w *message.Writer, cid string) error { return nil } -func (item *item) writeToBuffer(b *bytes.Buffer) error { +func (item *Item) writeToBuffer(b *bytes.Buffer) error { h := item.buildHeader() item.buildBody() @@ -178,7 +178,7 @@ func (item *item) writeToBuffer(b *bytes.Buffer) error { return nil } -func (item *item) message() (msg.Message, error) { +func (item *Item) message() (msg.Message, error) { var b bytes.Buffer if err := item.writeToBuffer(&b); err != nil { @@ -187,8 +187,8 @@ func (item *item) message() (msg.Message, error) { msg := msg.Message{ Content: b.String(), - IsUpdate: item.updateOnly, - ID: item.id(), + IsUpdate: item.UpdateOnly, + ID: item.Id(), } return msg, nil @@ -251,7 +251,7 @@ func getBody(content, description string, bodyCfg config.Body) string { } } -func (item *item) buildBody() { +func (item *Item) buildBody() { feed := item.feed var feedUrl *url.URL diff --git a/internal/feed/parse.go b/internal/feed/parse.go index b08c286..a2812bd 100644 --- a/internal/feed/parse.go +++ b/internal/feed/parse.go @@ -9,10 +9,9 @@ import ( "github.com/mmcdole/gofeed" "github.com/Necoro/feed2imap-go/internal/http" - "github.com/Necoro/feed2imap-go/pkg/log" ) -func (feed *Feed) parse() error { +func (feed *Feed) Parse() error { fp := gofeed.NewParser() var reader io.Reader @@ -58,22 +57,9 @@ func (feed *Feed) parse() error { } feed.feed = parsedFeed - feed.items = make([]item, len(parsedFeed.Items)) + feed.items = make([]Item, len(parsedFeed.Items)) for idx, feedItem := range parsedFeed.Items { - feed.items[idx] = item{Feed: parsedFeed, Item: feedItem, itemId: uuid.New(), feed: feed} + feed.items[idx] = Item{Feed: parsedFeed, feed: feed, Item: feedItem, ID: uuid.New()} } return cleanup() } - -func handleFeed(feed *Feed) { - log.Printf("Fetching %s from %s", feed.Name, feed.Url) - - err := feed.parse() - if err != nil { - if feed.Url == "" || feed.cached.Failures() >= feed.Global.MaxFailures { - log.Error(err) - } else { - log.Print(err) - } - } -} diff --git a/internal/feed/state.go b/internal/feed/state.go deleted file mode 100644 index 364616f..0000000 --- a/internal/feed/state.go +++ /dev/null @@ -1,184 +0,0 @@ -package feed - -import ( - "encoding/json" - "sync" - - "github.com/mmcdole/gofeed" - - "github.com/Necoro/feed2imap-go/pkg/config" - "github.com/Necoro/feed2imap-go/pkg/log" -) - -type State struct { - feeds map[string]*Feed - cache Cache - cfg *config.Config -} - -func (state *State) Foreach(f func(*Feed)) { - for _, feed := range state.feeds { - f(feed) - } -} - -func (state *State) ForeachGo(goFunc func(*Feed)) { - var wg sync.WaitGroup - wg.Add(len(state.feeds)) - - f := func(feed *Feed, wg *sync.WaitGroup) { - goFunc(feed) - wg.Done() - } - - for _, feed := range state.feeds { - go f(feed, &wg) - } - wg.Wait() -} - -func (state *State) LoadCache(fileName string, forceNew bool) error { - var ( - cache Cache - err error - ) - - if forceNew { - cache, err = newCache() - } else { - cache, err = LoadCache(fileName) - } - - if err != nil { - return err - } - state.cache = cache - - for _, feed := range state.feeds { - feed.cached = cache.findItem(feed) - } - return nil -} - -func (state *State) StoreCache(fileName string) error { - return state.cache.store(fileName) -} - -func (state *State) UnlockCache() { - _ = state.cache.Unlock() -} - -func (state *State) Fetch() int { - state.ForeachGo(handleFeed) - - ctr := 0 - for _, feed := range state.feeds { - success := feed.FetchSuccessful() - feed.cached.Checked(!success) - - if success { - ctr++ - } - } - - return ctr -} - -func printItem(item *gofeed.Item) string { - // analogous to gofeed.Feed.String - json, _ := json.MarshalIndent(item, "", " ") - return string(json) -} - -func (feed *Feed) filterItems() []item { - if feed.filter == nil { - return feed.items - } - - items := make([]item, 0, len(feed.items)) - - for _, item := range feed.items { - res, err := feed.filter.Run(item.Item) - if err != nil { - log.Errorf("Feed %s: Item %s: Error applying item filter: %s", feed.Name, printItem(item.Item), err) - res = true // include - } - - if res { - if log.IsDebug() { - log.Debugf("Filter '%s' matches for item %s", feed.ItemFilter, printItem(item.Item)) - } - items = append(items, item) - } else if log.IsDebug() { // printItem is not for free - log.Debugf("Filter '%s' does not match for item %s", feed.ItemFilter, printItem(item.Item)) - } - } - return items -} - -func filterFeed(feed *Feed) { - if len(feed.items) > 0 { - origLen := len(feed.items) - - log.Debugf("Filtering %s. Starting with %d items", feed.Name, origLen) - - items := feed.filterItems() - newLen := len(items) - if newLen < origLen { - log.Printf("Item filter on %s: Reduced from %d to %d items.", feed.Name, origLen, newLen) - origLen = newLen - } - - items = feed.cached.filterItems(items, feed.IgnHash, feed.AlwaysNew) - feed.items = items - - newLen = len(feed.items) - if newLen < origLen { - log.Printf("Filtered %s. Reduced from %d to %d items.", feed.Name, origLen, newLen) - } else { - log.Printf("Filtered %s, no reduction.", feed.Name) - } - - } else { - log.Debugf("No items for %s. No filtering.", feed.Name) - } -} - -func (state *State) Filter() { - if log.IsDebug() { - // single threaded for better output - state.Foreach(filterFeed) - } else { - state.ForeachGo(filterFeed) - } -} - -func NewState(cfg *config.Config) (*State, error) { - state := State{ - feeds: map[string]*Feed{}, - cache: Cache{}, // loaded later on - cfg: cfg, - } - - for name, parsedFeed := range cfg.Feeds { - feed, err := Create(parsedFeed, cfg.GlobalOptions) - if err != nil { - return nil, err - } - state.feeds[name] = feed - } - - return &state, nil -} - -func (state *State) RemoveUndue() { - for name, feed := range state.feeds { - if feed.Disable || !feed.NeedsUpdate(feed.cached.Last()) { - delete(state.feeds, name) - } - } -} - -func (state *State) NumFeeds() int { - return len(state.feeds) -} diff --git a/internal/feed/template/html.tpl b/internal/feed/template/html.tpl index aa7d341..ed98121 100644 --- a/internal/feed/template/html.tpl +++ b/internal/feed/template/html.tpl @@ -1,4 +1,4 @@ -{{- /*gotype:github.com/Necoro/feed2imap-go/internal/feed.feeditem*/ -}} +{{- /*gotype:github.com/Necoro/feed2imap-go/internal/feed.Item*/ -}} {{define "bottomLine"}} {{if .content}} @@ -16,7 +16,7 @@ Feed {{with .Feed.Link}}{{end}} - {{or .Feed.Title .Feed.Link "Unnammed feed"}} + {{or .Feed.Title .Feed.Link "Unnamed feed"}} {{if .Feed.Link}}{{end}} diff --git a/internal/feed/template/text.tpl b/internal/feed/template/text.tpl index 1225902..8ca30ed 100644 --- a/internal/feed/template/text.tpl +++ b/internal/feed/template/text.tpl @@ -1,4 +1,4 @@ -{{- /*gotype:github.com/Necoro/feed2imap-go/internal/feed.feeditem*/ -}} +{{- /*gotype:github.com/Necoro/feed2imap-go/internal/feed.Item*/ -}} {{- with .Item.Link -}} <{{.}}> diff --git a/main.go b/main.go index abc80fa..86a6ddf 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "fmt" "os" - "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/internal/feed/cache" "github.com/Necoro/feed2imap-go/internal/imap" "github.com/Necoro/feed2imap-go/pkg/config" "github.com/Necoro/feed2imap-go/pkg/log" @@ -33,7 +33,8 @@ func init() { flag.BoolVar(&debug, "d", debug, "enable debug output") } -func processFeed(feed *feed.Feed, client *imap.Client, dryRun bool) { +func processFeed(cf cache.CachedFeed, client *imap.Client, dryRun bool) { + feed := cf.Feed() msgs, err := feed.Messages() if err != nil { log.Errorf("Processing items of feed %s: %s", feed.Name, err) @@ -41,7 +42,7 @@ func processFeed(feed *feed.Feed, client *imap.Client, dryRun bool) { } if dryRun || len(msgs) == 0 { - feed.MarkSuccess() + cf.Commit() return } @@ -58,7 +59,7 @@ func processFeed(feed *feed.Feed, client *imap.Client, dryRun bool) { log.Printf("Uploaded %d messages to '%s' @ %s", len(msgs), feed.Name, folder) - feed.MarkSuccess() + cf.Commit() } func run() error { @@ -81,7 +82,7 @@ func run() error { return err } - state, err := feed.NewState(cfg) + state, err := cache.NewState(cfg) if err != nil { return err } @@ -116,9 +117,9 @@ func run() error { } if buildCache { - state.Foreach((*feed.Feed).MarkSuccess) + state.Foreach(cache.CachedFeed.Commit) } else { - state.ForeachGo(func(f *feed.Feed) { + state.ForeachGo(func(f cache.CachedFeed) { processFeed(f, c, dryRun) }) } diff --git a/tools/print-cache/print-cache.go b/tools/print-cache/print-cache.go index cdecac4..b3be0bf 100644 --- a/tools/print-cache/print-cache.go +++ b/tools/print-cache/print-cache.go @@ -5,7 +5,7 @@ import ( "fmt" "log" - "github.com/Necoro/feed2imap-go/internal/feed" + "github.com/Necoro/feed2imap-go/internal/feed/cache" ) // flags @@ -22,7 +22,7 @@ func init() { func main() { flag.Parse() - cache, err := feed.LoadCache(cacheFile) + cache, err := cache.LoadCache(cacheFile) if err != nil { log.Fatal(err) } -- cgit v1.2.3