Validate data before commit

This commit is contained in:
2026-03-01 19:22:15 +08:00
parent 6245ef6290
commit 8bd60a58bb
3 changed files with 231 additions and 185 deletions

View File

@@ -1,23 +1,24 @@
package kmb package kmb
import ( import (
"strings"
query "github.com/tgckpg/golifehk/query" query "github.com/tgckpg/golifehk/query"
"log"
"strings"
) )
func Query( lang string, message string ) query.IQueryResult { func Query(lang string, message string) query.IQueryResult {
var qo *query.QueryObject var qo *query.QueryObject
var err error var err error
qr := QueryResult{ Lang: lang } qr := QueryResult{Lang: lang}
routeStops, err := getRouteStops() routeStops, err := getRouteStops()
if err != nil { if err != nil {
qr.Error = err qr.Error = err
goto qrReturn goto qrReturn
} }
qo, err = query.Parse( strings.ToUpper( message ), routeStops ) qo, err = query.Parse(strings.ToUpper(message), routeStops)
if err != nil { if err != nil {
qr.Error = err qr.Error = err
goto qrReturn goto qrReturn
@@ -25,13 +26,13 @@ func Query( lang string, message string ) query.IQueryResult {
qr.Query = qo qr.Query = qo
if 0 < len( *qo.Results ) && 1 < len( *qo.SearchTerms ) { if 0 < len(*qo.Results) && 1 < len(*qo.SearchTerms) {
rSchedules := map[*RouteStop] *[] *Schedule{} rSchedules := map[*RouteStop]*[]*Schedule{}
for _, item := range *qo.Results { for _, item := range *qo.Results {
var r *RouteStop var r *RouteStop
r = any( item ).( *RouteStop ) r = any(item).(*RouteStop)
schedules, err := getSchedule( r ) schedules, err := getSchedule(r)
if err != nil { if err != nil {
qr.Error = err qr.Error = err
break break
@@ -42,7 +43,10 @@ func Query( lang string, message string ) query.IQueryResult {
qr.Schedules = &rSchedules qr.Schedules = &rSchedules
} }
qrReturn: qrReturn:
if qr.Error != nil {
log.Println(qr.Error)
}
var iqr query.IQueryResult var iqr query.IQueryResult
iqr = &qr iqr = &qr
return iqr return iqr

View File

@@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"path/filepath" "path/filepath"
query "github.com/tgckpg/golifehk/query" query "github.com/tgckpg/golifehk/query"
@@ -99,33 +98,46 @@ func readBusStopsData(buff *bytes.Buffer) (*map[string]*BusStop, error) {
} }
func getRouteStops() (*[]query.ISearchable, error) { func getRouteStops() (*[]query.ISearchable, error) {
var busStopMap *map[string]*BusStop
QUERY_FUNC := func() (io.ReadCloser, error) { QUERY_FUNC := func() (io.ReadCloser, error) {
return utils.HttpGet("https://data.etabus.gov.hk/v1/transport/kmb/stop") return utils.HttpGet("https://data.etabus.gov.hk/v1/transport/kmb/stop")
} }
buff, err := utils.CacheStream(JSON_BUSSTOPS, QUERY_FUNC, 7*24*3600) PARSE_FUNC := func(buff *bytes.Buffer) error {
var err error
busStopMap, err = readBusStopsData(buff)
return err
}
cs, err := utils.CacheStreamEx(JSON_BUSSTOPS, QUERY_FUNC)
if err != nil { if err != nil {
return nil, err return nil, err
} }
busStopMap, err := readBusStopsData(buff) err = cs.Try(PARSE_FUNC, 7*24*3600, 3)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var routeStops *[]*RouteStop
QUERY_FUNC = func() (io.ReadCloser, error) { QUERY_FUNC = func() (io.ReadCloser, error) {
return utils.HttpGet("https://data.etabus.gov.hk/v1/transport/kmb/route-stop") return utils.HttpGet("https://data.etabus.gov.hk/v1/transport/kmb/route-stop")
} }
buff, err = utils.CacheStream(JSON_ROUTESTOPS, QUERY_FUNC, 7*24*3600) PARSE_FUNC = func(buff *bytes.Buffer) error {
var err error
routeStops, err = readRouteStopsData(busStopMap, buff)
return err
}
cs, err = utils.CacheStreamEx(JSON_ROUTESTOPS, QUERY_FUNC)
if err != nil { if err != nil {
return nil, err return nil, err
} }
routeStops, err := readRouteStopsData(busStopMap, buff) err = cs.Try(PARSE_FUNC, 7*24*3600, 3)
if err != nil { if err != nil {
log.Printf("Unable to parse RouteStopsData: %s", err)
return nil, err return nil, err
} }

View File

@@ -2,6 +2,7 @@ package utils
import ( import (
"bytes" "bytes"
"fmt"
"io" "io"
"log" "log"
"os" "os"
@@ -13,24 +14,53 @@ type CacheStreams struct {
Local *bytes.Buffer Local *bytes.Buffer
Remote *bytes.Buffer Remote *bytes.Buffer
Path string Path string
Query func() ( io.ReadCloser, error ) Query func() (io.ReadCloser, error)
} }
func ( cs *CacheStreams ) Commit() error { func (cs *CacheStreams) Commit() error {
f, err := os.OpenFile( cs.Path, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0644 ) f, err := os.OpenFile(cs.Path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil { if err != nil {
return err return err
} }
defer f.Close() defer f.Close()
_, err = io.Copy( f, bytes.NewReader( cs.Remote.Bytes() ) ) _, err = io.Copy(f, bytes.NewReader(cs.Remote.Bytes()))
log.Printf( "Commit: %s", cs.Path ) log.Printf("Commit: %s", cs.Path)
return err return err
} }
func ( cs *CacheStreams ) Reload() error { func (cs *CacheStreams) Try(Parser func(*bytes.Buffer) error, expires time.Duration, retries int) error {
var err error
for i := 0; i < retries; i++ {
if cs.NotExpired(expires) {
err = Parser(cs.Local)
if err == nil {
log.Printf("Using cache: %s", cs.Path)
return nil
}
}
err = cs.Reload()
log.Printf("Reloading (%d): %s", i+1, cs.Path)
if err != nil {
log.Printf("Error retrieving data(%s): %s", i+1, err)
}
if cs.Remote != nil {
err = Parser(cs.Remote)
if err == nil {
cs.Commit()
return nil
}
}
}
return fmt.Errorf("Failed to get data for: %s", cs.Path)
}
func (cs *CacheStreams) Reload() error {
s, err := cs.Query() s, err := cs.Query()
if err != nil { if err != nil {
@@ -39,35 +69,35 @@ func ( cs *CacheStreams ) Reload() error {
defer s.Close() defer s.Close()
cs.Remote = bytes.NewBuffer( []byte{} ) cs.Remote = bytes.NewBuffer([]byte{})
_, err = io.Copy( cs.Remote, s ) _, err = io.Copy(cs.Remote, s)
return err return err
} }
func ( cs *CacheStreams ) NotExpired( expires time.Duration ) bool { func (cs *CacheStreams) NotExpired(expires time.Duration) bool {
cache, err := os.Stat( cs.Path ) cache, err := os.Stat(cs.Path)
if err == nil { if err == nil {
expired := cache.ModTime().Add( expires * 1e9 ) expired := cache.ModTime().Add(expires * 1e9)
return time.Now().Before( expired ) return time.Now().Before(expired)
} }
return false return false
} }
func CacheStreamEx( path string, readStream func() ( io.ReadCloser, error ) ) ( *CacheStreams, error ) { func CacheStreamEx(path string, readStream func() (io.ReadCloser, error)) (*CacheStreams, error) {
cs := CacheStreams{} cs := CacheStreams{}
cs.Path = path cs.Path = path
cs.Query = readStream cs.Query = readStream
f, err := os.Open( path ) f, err := os.Open(path)
if err == nil { if err == nil {
defer f.Close() defer f.Close()
cs.Local = bytes.NewBuffer( []byte{} ) cs.Local = bytes.NewBuffer([]byte{})
_, err = io.Copy( cs.Local, f ) _, err = io.Copy(cs.Local, f)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -76,21 +106,20 @@ func CacheStreamEx( path string, readStream func() ( io.ReadCloser, error ) ) (
return &cs, nil return &cs, nil
} }
func CacheStream(path string, readStream func() (io.ReadCloser, error), expires time.Duration) (*bytes.Buffer, error) {
func CacheStream( path string, readStream func() ( io.ReadCloser, error ), expires time.Duration ) ( *bytes.Buffer, error ) { cache, err := os.Stat(path)
cache, err := os.Stat( path )
// Check if cache exists and not expired // Check if cache exists and not expired
if err == nil { if err == nil {
expired := cache.ModTime().Add( expires * 1e9 ) expired := cache.ModTime().Add(expires * 1e9)
if time.Now().Before( expired ) { if time.Now().Before(expired) {
f, err := os.Open( path ) f, err := os.Open(path)
if err == nil { if err == nil {
defer f.Close() defer f.Close()
log.Printf( "Using cache: %s", path ) log.Printf("Using cache: %s", path)
writeBuff := bytes.NewBuffer( []byte{} ) writeBuff := bytes.NewBuffer([]byte{})
_, err = io.Copy( writeBuff, f ) _, err = io.Copy(writeBuff, f)
if err == nil { if err == nil {
return writeBuff, nil return writeBuff, nil
} }
@@ -98,11 +127,12 @@ func CacheStream( path string, readStream func() ( io.ReadCloser, error ), expir
} }
} }
err = os.MkdirAll( filepath.Dir( path ), 0750 ) err = os.MkdirAll(filepath.Dir(path), 0750)
if err != nil { if err != nil {
return nil, err } return nil, err
}
writeBuff := bytes.NewBuffer( []byte{} ) writeBuff := bytes.NewBuffer([]byte{})
// Get the reader that return new data // Get the reader that return new data
s, err := readStream() s, err := readStream()
@@ -112,12 +142,12 @@ func CacheStream( path string, readStream func() ( io.ReadCloser, error ), expir
defer s.Close() defer s.Close()
_, err = io.Copy( writeBuff, s ) _, err = io.Copy(writeBuff, s)
if err != nil { if err != nil {
return nil, err return nil, err
} }
f, err := os.OpenFile( path, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0644 ) f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -125,7 +155,7 @@ func CacheStream( path string, readStream func() ( io.ReadCloser, error ), expir
defer f.Close() defer f.Close()
data := writeBuff.Bytes() data := writeBuff.Bytes()
_, err = io.Copy( f, bytes.NewReader( data ) ) _, err = io.Copy(f, bytes.NewReader(data))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -133,19 +163,19 @@ func CacheStream( path string, readStream func() ( io.ReadCloser, error ), expir
return writeBuff, nil return writeBuff, nil
} }
func ChangedStream( path string, readStream func() ( io.Reader, error ), dataModTime time.Time ) ( *bytes.Buffer, error ) { func ChangedStream(path string, readStream func() (io.Reader, error), dataModTime time.Time) (*bytes.Buffer, error) {
cache, err := os.Stat( path ) cache, err := os.Stat(path)
// Check if cache exists and not expired // Check if cache exists and not expired
if err == nil { if err == nil {
if dataModTime.Before( cache.ModTime() ) { if dataModTime.Before(cache.ModTime()) {
f, err := os.Open( path ) f, err := os.Open(path)
if err == nil { if err == nil {
defer f.Close() defer f.Close()
log.Printf( "Reading from file: %s", path ) log.Printf("Reading from file: %s", path)
writeBuff := bytes.NewBuffer( []byte{} ) writeBuff := bytes.NewBuffer([]byte{})
_, err = io.Copy( writeBuff, f ) _, err = io.Copy(writeBuff, f)
if err == nil { if err == nil {
return writeBuff, nil return writeBuff, nil
} }
@@ -153,12 +183,12 @@ func ChangedStream( path string, readStream func() ( io.Reader, error ), dataMod
} }
} }
err = os.MkdirAll( filepath.Dir( path ), 0750 ) err = os.MkdirAll(filepath.Dir(path), 0750)
if err != nil { if err != nil {
return nil, err return nil, err
} }
writeBuff := bytes.NewBuffer( []byte{} ) writeBuff := bytes.NewBuffer([]byte{})
// Get the reader that return new data // Get the reader that return new data
s, err := readStream() s, err := readStream()
@@ -166,12 +196,12 @@ func ChangedStream( path string, readStream func() ( io.Reader, error ), dataMod
return nil, err return nil, err
} }
_, err = io.Copy( writeBuff, s ) _, err = io.Copy(writeBuff, s)
if err != nil { if err != nil {
return nil, err return nil, err
} }
f, err := os.OpenFile( path, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0644 ) f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -179,7 +209,7 @@ func ChangedStream( path string, readStream func() ( io.Reader, error ), dataMod
defer f.Close() defer f.Close()
data := writeBuff.Bytes() data := writeBuff.Bytes()
_, err = io.Copy( f, bytes.NewReader( data ) ) _, err = io.Copy(f, bytes.NewReader(data))
if err != nil { if err != nil {
return nil, err return nil, err
} }