Source code for searx.plugins.calculator

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval").
"""

from __future__ import annotations
import typing

import ast
import re
import operator
import multiprocessing

import babel
import babel.numbers
from flask_babel import gettext

from searx.result_types import EngineResults
from searx.plugins import Plugin, PluginInfo

if typing.TYPE_CHECKING:
    from searx.search import SearchWithPlugins
    from searx.extended_types import SXNG_Request
    from searx.plugins import PluginCfg


[docs] class SXNGPlugin(Plugin): """Plugin converts strings to different hash digests. The results are displayed in area for the "answers". """ id = "calculator" def __init__(self, plg_cfg: "PluginCfg") -> None: super().__init__(plg_cfg) self.info = PluginInfo( id=self.id, name=gettext("Basic Calculator"), description=gettext("Calculate mathematical expressions via the search bar"), preference_section="general", )
operators: dict[type, typing.Callable] = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.Pow: operator.pow, ast.BitXor: operator.xor, ast.USub: operator.neg, } # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating # the old behavior "fork") but it will not solve the core problem of fork, nor # will it remove the deprecation warnings in py3.12 & py3.13. Issue is # ddiscussed here: https://github.com/searxng/searxng/issues/4159 mp_fork = multiprocessing.get_context("fork") def _eval_expr(expr): """ >>> _eval_expr('2^6') 64 >>> _eval_expr('2**6') 64 >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)') -5.0 """ try: return _eval(ast.parse(expr, mode='eval').body) except ZeroDivisionError: # This is undefined return "" def _eval(node): if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): return node.value if isinstance(node, ast.BinOp): return operators[type(node.op)](_eval(node.left), _eval(node.right)) if isinstance(node, ast.UnaryOp): return operators[type(node.op)](_eval(node.operand)) raise TypeError(node) def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name try: q.put(func(*args, **kwargs)) except: q.put(None) raise def timeout_func(timeout, func, *args, **kwargs): que = mp_fork.Queue() p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs) p.start() p.join(timeout=timeout) ret_val = None # pylint: disable=used-before-assignment,undefined-variable if not p.is_alive(): ret_val = que.get() else: logger.debug("terminate function after timeout is exceeded") # type: ignore p.terminate() p.join() p.close() return ret_val