From 8bd60a58bbaf20f576e48bdb1d8e6323dfd31278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=9F=E9=85=8C=20=E9=B5=AC=E5=85=84?= Date: Sun, 1 Mar 2026 19:22:15 +0800 Subject: [PATCH] Validate data before commit --- datasources/kmb/query.go | 74 ++++---- datasources/kmb/routestops.go | 24 ++- utils/cache.go | 318 +++++++++++++++++++--------------- 3 files changed, 231 insertions(+), 185 deletions(-) diff --git a/datasources/kmb/query.go b/datasources/kmb/query.go index 5335339..2ab81c0 100644 --- a/datasources/kmb/query.go +++ b/datasources/kmb/query.go @@ -1,49 +1,53 @@ package kmb import ( - "strings" - query "github.com/tgckpg/golifehk/query" + query "github.com/tgckpg/golifehk/query" + "log" + "strings" ) -func Query( lang string, message string ) query.IQueryResult { +func Query(lang string, message string) query.IQueryResult { - var qo *query.QueryObject - var err error + var qo *query.QueryObject + var err error - qr := QueryResult{ Lang: lang } - routeStops, err := getRouteStops() - if err != nil { - qr.Error = err - goto qrReturn - } + qr := QueryResult{Lang: lang} + routeStops, err := getRouteStops() + if err != nil { + qr.Error = err + goto qrReturn + } - qo, err = query.Parse( strings.ToUpper( message ), routeStops ) - if err != nil { - qr.Error = err - goto qrReturn - } + qo, err = query.Parse(strings.ToUpper(message), routeStops) + if err != nil { + qr.Error = err + goto qrReturn + } - qr.Query = qo + qr.Query = qo - if 0 < len( *qo.Results ) && 1 < len( *qo.SearchTerms ) { + if 0 < len(*qo.Results) && 1 < len(*qo.SearchTerms) { - rSchedules := map[*RouteStop] *[] *Schedule{} - for _, item := range *qo.Results { - var r *RouteStop - r = any( item ).( *RouteStop ) - schedules, err := getSchedule( r ) - if err != nil { - qr.Error = err - break - } + rSchedules := map[*RouteStop]*[]*Schedule{} + for _, item := range *qo.Results { + var r *RouteStop + r = any(item).(*RouteStop) + schedules, err := getSchedule(r) + if err != nil { + qr.Error = err + break + } - rSchedules[r] = schedules - } - qr.Schedules = &rSchedules - } + rSchedules[r] = schedules + } + qr.Schedules = &rSchedules + } - qrReturn: - var iqr query.IQueryResult - iqr = &qr - return iqr +qrReturn: + if qr.Error != nil { + log.Println(qr.Error) + } + var iqr query.IQueryResult + iqr = &qr + return iqr } diff --git a/datasources/kmb/routestops.go b/datasources/kmb/routestops.go index b5bb91d..2bfb63a 100644 --- a/datasources/kmb/routestops.go +++ b/datasources/kmb/routestops.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "log" "path/filepath" query "github.com/tgckpg/golifehk/query" @@ -99,33 +98,46 @@ func readBusStopsData(buff *bytes.Buffer) (*map[string]*BusStop, error) { } func getRouteStops() (*[]query.ISearchable, error) { + var busStopMap *map[string]*BusStop QUERY_FUNC := func() (io.ReadCloser, error) { return utils.HttpGet("https://data.etabus.gov.hk/v1/transport/kmb/stop") } - buff, err := utils.CacheStream(JSON_BUSSTOPS, QUERY_FUNC, 7*24*3600) + PARSE_FUNC := func(buff *bytes.Buffer) error { + var err error + busStopMap, err = readBusStopsData(buff) + return err + } + + cs, err := utils.CacheStreamEx(JSON_BUSSTOPS, QUERY_FUNC) if err != nil { return nil, err } - busStopMap, err := readBusStopsData(buff) + err = cs.Try(PARSE_FUNC, 7*24*3600, 3) if err != nil { return nil, err } + var routeStops *[]*RouteStop QUERY_FUNC = func() (io.ReadCloser, error) { return utils.HttpGet("https://data.etabus.gov.hk/v1/transport/kmb/route-stop") } - buff, err = utils.CacheStream(JSON_ROUTESTOPS, QUERY_FUNC, 7*24*3600) + PARSE_FUNC = func(buff *bytes.Buffer) error { + var err error + routeStops, err = readRouteStopsData(busStopMap, buff) + return err + } + + cs, err = utils.CacheStreamEx(JSON_ROUTESTOPS, QUERY_FUNC) if err != nil { return nil, err } - routeStops, err := readRouteStopsData(busStopMap, buff) + err = cs.Try(PARSE_FUNC, 7*24*3600, 3) if err != nil { - log.Printf("Unable to parse RouteStopsData: %s", err) return nil, err } diff --git a/utils/cache.go b/utils/cache.go index e2131ca..c0ecbcd 100644 --- a/utils/cache.go +++ b/utils/cache.go @@ -1,188 +1,218 @@ package utils import ( - "bytes" - "io" - "log" - "os" - "path/filepath" - "time" + "bytes" + "fmt" + "io" + "log" + "os" + "path/filepath" + "time" ) type CacheStreams struct { - Local *bytes.Buffer - Remote *bytes.Buffer - Path string - Query func() ( io.ReadCloser, error ) + Local *bytes.Buffer + Remote *bytes.Buffer + Path string + Query func() (io.ReadCloser, error) } -func ( cs *CacheStreams ) Commit() error { +func (cs *CacheStreams) Commit() error { - f, err := os.OpenFile( cs.Path, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0644 ) - if err != nil { - return err - } + f, err := os.OpenFile(cs.Path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } - defer f.Close() + defer f.Close() - _, err = io.Copy( f, bytes.NewReader( cs.Remote.Bytes() ) ) - log.Printf( "Commit: %s", cs.Path ) - return err + _, err = io.Copy(f, bytes.NewReader(cs.Remote.Bytes())) + log.Printf("Commit: %s", cs.Path) + return err } -func ( cs *CacheStreams ) Reload() error { +func (cs *CacheStreams) Try(Parser func(*bytes.Buffer) error, expires time.Duration, retries int) error { + var err error + for i := 0; i < retries; i++ { + if cs.NotExpired(expires) { + err = Parser(cs.Local) + if err == nil { + log.Printf("Using cache: %s", cs.Path) + return nil + } + } - s, err := cs.Query() - if err != nil { - return err - } + err = cs.Reload() + log.Printf("Reloading (%d): %s", i+1, cs.Path) + if err != nil { + log.Printf("Error retrieving data(%s): %s", i+1, err) + } - defer s.Close() + if cs.Remote != nil { + err = Parser(cs.Remote) + if err == nil { + cs.Commit() + return nil + } + } + } - cs.Remote = bytes.NewBuffer( []byte{} ) - _, err = io.Copy( cs.Remote, s ) - - return err + return fmt.Errorf("Failed to get data for: %s", cs.Path) } -func ( cs *CacheStreams ) NotExpired( expires time.Duration ) bool { +func (cs *CacheStreams) Reload() error { - cache, err := os.Stat( cs.Path ) + s, err := cs.Query() + if err != nil { + return err + } - if err == nil { - expired := cache.ModTime().Add( expires * 1e9 ) - return time.Now().Before( expired ) - } + defer s.Close() - return false + cs.Remote = bytes.NewBuffer([]byte{}) + _, err = io.Copy(cs.Remote, s) + + return err } -func CacheStreamEx( path string, readStream func() ( io.ReadCloser, error ) ) ( *CacheStreams, error ) { +func (cs *CacheStreams) NotExpired(expires time.Duration) bool { - cs := CacheStreams{} - cs.Path = path - cs.Query = readStream + cache, err := os.Stat(cs.Path) - f, err := os.Open( path ) - if err == nil { - defer f.Close() - cs.Local = bytes.NewBuffer( []byte{} ) - _, err = io.Copy( cs.Local, f ) - if err != nil { - return nil, err - } - } + if err == nil { + expired := cache.ModTime().Add(expires * 1e9) + return time.Now().Before(expired) + } - return &cs, nil + return false } +func CacheStreamEx(path string, readStream func() (io.ReadCloser, error)) (*CacheStreams, error) { -func CacheStream( path string, readStream func() ( io.ReadCloser, error ), expires time.Duration ) ( *bytes.Buffer, error ) { + cs := CacheStreams{} + cs.Path = path + cs.Query = readStream - cache, err := os.Stat( path ) + f, err := os.Open(path) + if err == nil { + defer f.Close() + cs.Local = bytes.NewBuffer([]byte{}) + _, err = io.Copy(cs.Local, f) + if err != nil { + return nil, err + } + } - // Check if cache exists and not expired - if err == nil { - expired := cache.ModTime().Add( expires * 1e9 ) - if time.Now().Before( expired ) { - f, err := os.Open( path ) - if err == nil { - defer f.Close() - log.Printf( "Using cache: %s", path ) - writeBuff := bytes.NewBuffer( []byte{} ) - _, err = io.Copy( writeBuff, f ) - if err == nil { - return writeBuff, nil - } - } - } - } - - err = os.MkdirAll( filepath.Dir( path ), 0750 ) - if err != nil { - return nil, err } - - writeBuff := bytes.NewBuffer( []byte{} ) - - // Get the reader that return new data - s, err := readStream() - if err != nil { - return nil, err - } - - defer s.Close() - - _, err = io.Copy( writeBuff, s ) - if err != nil { - return nil, err - } - - f, err := os.OpenFile( path, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0644 ) - if err != nil { - return nil, err - } - - defer f.Close() - - data := writeBuff.Bytes() - _, err = io.Copy( f, bytes.NewReader( data ) ) - if err != nil { - return nil, err - } - - return writeBuff, nil + return &cs, nil } -func ChangedStream( path string, readStream func() ( io.Reader, error ), dataModTime time.Time ) ( *bytes.Buffer, error ) { +func CacheStream(path string, readStream func() (io.ReadCloser, error), expires time.Duration) (*bytes.Buffer, error) { - cache, err := os.Stat( path ) + cache, err := os.Stat(path) - // Check if cache exists and not expired - if err == nil { - if dataModTime.Before( cache.ModTime() ) { - f, err := os.Open( path ) - if err == nil { - defer f.Close() - log.Printf( "Reading from file: %s", path ) - writeBuff := bytes.NewBuffer( []byte{} ) - _, err = io.Copy( writeBuff, f ) - if err == nil { - return writeBuff, nil - } - } - } - } + // Check if cache exists and not expired + if err == nil { + expired := cache.ModTime().Add(expires * 1e9) + if time.Now().Before(expired) { + f, err := os.Open(path) + if err == nil { + defer f.Close() + log.Printf("Using cache: %s", path) + writeBuff := bytes.NewBuffer([]byte{}) + _, err = io.Copy(writeBuff, f) + if err == nil { + return writeBuff, nil + } + } + } + } - err = os.MkdirAll( filepath.Dir( path ), 0750 ) - if err != nil { - return nil, err - } + err = os.MkdirAll(filepath.Dir(path), 0750) + if err != nil { + return nil, err + } - writeBuff := bytes.NewBuffer( []byte{} ) + writeBuff := bytes.NewBuffer([]byte{}) - // Get the reader that return new data - s, err := readStream() - if err != nil { - return nil, err - } + // Get the reader that return new data + s, err := readStream() + if err != nil { + return nil, err + } - _, err = io.Copy( writeBuff, s ) - if err != nil { - return nil, err - } + defer s.Close() - f, err := os.OpenFile( path, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0644 ) - if err != nil { - return nil, err - } + _, err = io.Copy(writeBuff, s) + if err != nil { + return nil, err + } - defer f.Close() + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return nil, err + } - data := writeBuff.Bytes() - _, err = io.Copy( f, bytes.NewReader( data ) ) - if err != nil { - return nil, err - } + defer f.Close() - return writeBuff, nil + data := writeBuff.Bytes() + _, err = io.Copy(f, bytes.NewReader(data)) + if err != nil { + return nil, err + } + + return writeBuff, nil +} + +func ChangedStream(path string, readStream func() (io.Reader, error), dataModTime time.Time) (*bytes.Buffer, error) { + + cache, err := os.Stat(path) + + // Check if cache exists and not expired + if err == nil { + if dataModTime.Before(cache.ModTime()) { + f, err := os.Open(path) + if err == nil { + defer f.Close() + log.Printf("Reading from file: %s", path) + writeBuff := bytes.NewBuffer([]byte{}) + _, err = io.Copy(writeBuff, f) + if err == nil { + return writeBuff, nil + } + } + } + } + + err = os.MkdirAll(filepath.Dir(path), 0750) + if err != nil { + return nil, err + } + + writeBuff := bytes.NewBuffer([]byte{}) + + // Get the reader that return new data + s, err := readStream() + if err != nil { + return nil, err + } + + _, err = io.Copy(writeBuff, s) + if err != nil { + return nil, err + } + + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return nil, err + } + + defer f.Close() + + data := writeBuff.Bytes() + _, err = io.Copy(f, bytes.NewReader(data)) + if err != nil { + return nil, err + } + + return writeBuff, nil }