a
    h$                     @  s6  U d dl mZ d dlZd dlZd dlmZmZmZ d dlm	Z	m
Z
 d dlmZmZmZ d dlmZmZ d dlZd dlZd dlmZ d dlmZ d	d
lmZmZ d	dlmZ d	dlmZ ejdk rd dl m!Z! da"de#d< da$de#d< d a%dddddZ&e
ddddddZ'ddd d!d"Z(ej)d#d$ddd%d&d'd(Z*ej)d#d)ddddd*d+d,Z+ej)d#d)dd-d.d/d0Z,ej-d1e d2ddd3d4d5Z.ej-ddd6d7d8Z/ej-ddd6d9d:Z0G d;d< d<Z1ej-d=d>d<d?d@dAZ2ej-d=d>d<d?dBdCZ3ej-dDdEdFdGdHZ4ej-dDdEdIdJdKZ5dS )L    )annotationsN)Callable	GeneratorIterator)	ExitStackcontextmanager)isasyncgenfunctioniscoroutinefunctionismethod)Anycast)
SubRequest)Exit   )get_all_backendsget_async_backend)iterate_exceptions)
TestRunner)      )ExceptionGroupzTestRunner | None_current_runnerzExitStack | None_runner_stackobjectztuple[str, dict[str, Any]])backendreturnc                 C  sj   t | tr| i fS t | tr^t| dkr^t | d tr^t | d tr^ttttttf f | S tdd S )N   r   r   z@anyio_backend must be either a string or tuple of (string, dict))
isinstancestrtuplelendictr   r   	TypeError)r    r#   O/var/www/html/assistant/venv/lib/python3.9/site-packages/anyio/pytest_plugin.pyextract_backend_and_options   s    
r%   r   zdict[str, Any]zIterator[TestRunner])backend_namebackend_optionsr   c                 c  s   t d u rZt| }t atjd d u rBtj| }ttjj	| |pHi }t
||a td7 az2t V  W td8 atstd usJ t  d  aa n*td8 atstd usJ t  d  aa 0 d S Nr   )r   r   r   r   sniffioZcurrent_async_library_cvargetsetcallbackresetenter_contextZcreate_test_runner_runner_leasesclose)r&   r'   Zasynclibtokenr#   r#   r$   
get_runner%   s.    
r2   r   None)configr   c                 C  s   |  dd d S )NmarkerszManyio: mark the (coroutine function) test to be run asynchronously via anyio.)Zaddinivalue_line)r4   r#   r#   r$   pytest_configureC   s    r6   T)ZhookwrapperzGenerator[Any])
fixturedefrequestr   c                 #  s   dddddd fdd}| j  t s4t rd|jv r|| _ | j}d| jv  sf|  jd7  _d| jv  s|  jd	7  _zd V W  | _ || _S  | _ || _0 d V S )
Nr   r   )argsanyio_backendr8   kwargsr   c                 ?  s   |j r2t r2t jt|j u r2 j|j }n }t| \}}rN| |d< rZ||d< t||:}t|r|	||E d H  n|
||V  W d    n1 s0    Y  d S )Nr:   r8   )instancer
   type__self____func____get__r%   r2   r   Zrun_asyncgen_fixtureZrun_fixture)r:   r8   r9   r;   Z
local_funcr&   r'   runnerfuncZhas_backend_argZhas_request_argr#   r$   wrapperL   s"    z%pytest_fixture_setup.<locals>.wrapperr:   r:   r8   r8   )rC   r   r	   Zfixturenamesargnames)r7   r8   rD   Zoriginal_argnamer#   rB   r$   pytest_fixture_setupJ   s(    
  rH   )Ztryfirst)	collectornameobjr   c                 C  sj   |  ||rft|dr|jjn|}t|rf| d}t|dd}|sVtdd |D rftj	
d| d S )N
hypothesisanyioZ
pytestmarkr#   c                 s  s   | ]}|j d kV  qdS )rM   N)rJ   ).0markerr#   r#   r$   	<genexpr>       z,pytest_pycollect_makeitem.<locals>.<genexpr>r:   )ZistestfunctionhasattrrL   
inner_testr	   Zget_closest_markergetattranypytestmarkZusefixtures)rI   rJ   rK   Z
inner_funcrO   Zown_markersr#   r#   r$   pytest_pycollect_makeitem}   s    
rX   zbool | None)
pyfuncitemr   c                   s(  ddd fdd}| j d}|r$t|\ t| jdrn| jjjj|jkrjtrj|| jj_d S t| jr$| j fdd	| j	j
D }t n}z|| j| W nJ ty } z2t|D ]}t|tttfr||q̂ W Y d }~n
d }~0 0 W d    n1 s0    Y  d
S d S )Nr   r3   )r;   r   c                    s:   t  }||  W d    n1 s,0    Y  d S N)r2   run_test)r;   rA   )r&   r'   original_funcr#   r$   run_with_hypothesis   s    z/pytest_pyfunc_call.<locals>.run_with_hypothesisr:   rL   c                   s   i | ]}| | qS r#   r#   )rN   arg)funcargsr#   r$   
<dictcomp>   rQ   z&pytest_pyfunc_call.<locals>.<dictcomp>T)r_   r*   r%   rR   rK   rL   rS   __qualname__r	   Z_fixtureinforG   r2   r[   r   r   r   r   KeyboardInterrupt
SystemExit)rY   r]   r   ZtestargsrA   Zexcgrpexcr#   )r&   r'   r_   r\   r$   pytest_pyfunc_call   s.    

8re   module)scopeparams)r8   r   c                 C  s   | j S rZ   )paramrF   r#   r#   r$   r:      s    r:   )r:   r   c                 C  s   t | tr| S | d S d S )Nr   r   r   rE   r#   r#   r$   anyio_backend_name   s    
rk   c                 C  s   t | tri S | d S d S r(   rj   rE   r#   r#   r$   anyio_backend_options   s    
rl   c                   @  sD   e Zd ZdZdddddZedddd	ZddddddZd
S )FreePortFactoryaO  
    Manages port generation based on specified socket kind, ensuring no duplicate
    ports are generated.

    This class provides functionality for generating available free ports on the
    system. It is initialized with a specific socket kind and can generate ports
    for given address families while avoiding reuse of previously generated ports.

    Users should not instantiate this class directly, but use the
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
    zsocket.SocketKindr3   )kindr   c                 C  s   || _ tt  | _d S rZ   )_kindr+   int
_generated)selfrn   r#   r#   r$   __init__   s    zFreePortFactory.__init__r   c                 C  s   | j S )z
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability

        )ro   )rr   r#   r#   r$   rn      s    zFreePortFactory.kindNzsocket.AddressFamily | Nonerp   )familyr   c              
   C  s   |dur|g}nt jg}t jr*|t j d}t }|D ]f}|t  || j}|t jkr`dnd}z|||f W n t	y   Y  qY n0 |s:|
 d }q:|| jvr| j| |W  d   S W d   q*1 s0    Y  q*dS )z
        Return an unbound port for the given address family.

        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
        :return: a port number

        Nr   z::1z	127.0.0.1r   )socketAF_INEThas_ipv6appendAF_INET6r   r.   ro   bindOSErrorgetsocknamerq   add)rr   ru   Zfamiliesportstacksockaddrr#   r#   r$   __call__   s&    
zFreePortFactory.__call__)N)__name__
__module__ra   __doc__rs   propertyrn   r   r#   r#   r#   r$   rm      s
   rm   session)rg   rt   c                   C  s
   t tjS rZ   )rm   rv   SOCK_STREAMr#   r#   r#   r$   free_tcp_port_factory   s    r   c                   C  s
   t tjS rZ   )rm   rv   
SOCK_DGRAMr#   r#   r#   r$   free_udp_port_factory  s    r   zCallable[[], int]rp   )r   r   c                 C  s   |  S rZ   r#   )r   r#   r#   r$   free_tcp_port	  s    r   )r   r   c                 C  s   |  S rZ   r#   )r   r#   r#   r$   free_udp_port  s    r   )6
__future__r   rv   syscollections.abcr   r   r   
contextlibr   r   inspectr   r	   r
   typingr   r   rV   r)   Z_pytest.fixturesr   Z_pytest.outcomesr   Z_core._eventloopr   r   Z_core._exceptionsr   abcr   version_infoZexceptiongroupr   r   __annotations__r   r/   r%   r2   r6   ZhookimplrH   rX   re   Zfixturer:   rk   rl   rm   r   r   r   r   r#   r#   r#   r$   <module>   sV   


2


$=

