Skip to content

openfoodfacts_proxy.routes.cgi.reference

[docs] module openfoodfacts_proxy.routes.cgi.reference

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from fastapi import APIRouter, Query, Request
from fastapi.responses import Response

from openfoodfacts_proxy.core.application_container import ApplicationContainer
from openfoodfacts_proxy.routes.common import ALL_HTTP_METHODS

router = APIRouter()


@router.get("/cgi/search.pl", response_model=None)
async def search_legacy(
    request: Request,
    search_terms: str | None = Query(None),
    code: str | None = Query(None),
    brands: str | None = Query(None, alias="tagtype_0"),
    page: int = Query(1),
    page_size: int = Query(24),
    json: int = Query(1),
) -> Response:
    """Serve legacy search locally when supported and proxy the rest upstream."""
    del brands, json
    container = ApplicationContainer.from_app(request.app)
    return await container.cgi_search_handler.handle(
        request,
        search_terms=search_terms,
        code=code,
        page=page,
        page_size=page_size,
    )


@router.post("/cgi/search.pl", response_model=None, operation_id="search_legacy_post")
async def search_legacy_post(request: Request) -> Response:
[docs]
"""Proxy legacy CGI POST search explicitly without write-through reconciliation.""" return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_legacy_search_post(request) @router.post("/cgi/product_image_upload.pl") async def product_image_upload(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_product_image_upload( request ) @router.get("/cgi/ingredients.pl") async def ingredients(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).reference_get_handler.handle(request) @router.post("/cgi/ingredients.pl", response_model=None, operation_id="ingredients_post") async def ingredients_post(request: Request) -> Response:
[docs]
return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_extract_ingredients(request) @router.post("/cgi/packaging.pl", response_model=None, operation_id="packaging_post") async def packaging_post(request: Request) -> Response:
[docs]
return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_extract_packaging(request) @router.get("/cgi/product_image_crop.pl", operation_id="product_image_crop_get") async def product_image_crop_get(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_product_image_crop(request) @router.post("/cgi/product_image_crop.pl", operation_id="product_image_crop_post") async def product_image_crop_post(request: Request) -> Response:
[docs]
return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_product_image_crop(request) @router.post("/cgi/product_image_unselect.pl") async def product_image_unselect(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_product_image_unselect( request ) @router.post("/cgi/product_jqm2.pl") async def product_jqm2(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_product_update(request) @router.get("/cgi/suggest.pl") async def suggest(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).reference_get_handler.handle(request) @router.get("/cgi/nutrients.pl") async def nutrients(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).reference_get_handler.handle(request) @router.post("/cgi/session.pl") async def session(request: Request) -> Response: return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_session(request) @router.post("/cgi/auth.pl", response_model=None, operation_id="auth_post") async def auth(request: Request) -> Response: """Proxy legacy CGI authentication explicitly without write-through reconciliation.""" return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_auth(request) @router.post("/cgi/user.pl", response_model=None, operation_id="user_post") async def user(request: Request) -> Response: """Proxy legacy CGI user registration explicitly without write-through reconciliation.""" return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_register(request) @router.post("/cgi/reset_password.pl", response_model=None, operation_id="reset_password_post") async def reset_password(request: Request) -> Response: """Proxy legacy CGI password reset explicitly without write-through reconciliation.""" return await ApplicationContainer.from_app(request.app).documented_write_handler.handle_reset_password(request) @router.api_route("/cgi/{path:path}", methods=ALL_HTTP_METHODS, include_in_schema=False) async def redirect_unhandled_cgi(request: Request, path: str) -> Response: del path container = ApplicationContainer.from_app(request.app) if request.method.upper() == "GET": return await container.reference_get_handler.handle(request) return await container.passthrough_handler.handle(request)