from _typeshed import StrOrBytesPath
from collections.abc import Callable, Iterator
from logging import Logger
from typing import IO
from typing_extensions import Self, TypeAlias

from paramiko.channel import Channel
from paramiko.sftp import BaseSFTP
from paramiko.sftp_attr import SFTPAttributes
from paramiko.sftp_file import SFTPFile
from paramiko.transport import Transport
from paramiko.util import ClosingContextManager

_Callback: TypeAlias = Callable[[int, int], object]

b_slash: bytes

class SFTPClient(BaseSFTP, ClosingContextManager):
    sock: Channel
    ultra_debug: bool
    request_number: int
    logger: Logger
    def __init__(self, sock: Channel) -> None: ...
    @classmethod
    def from_transport(cls, t: Transport, window_size: int | None = None, max_packet_size: int | None = None) -> Self | None: ...
    def close(self) -> None: ...
    def get_channel(self) -> Channel | None: ...
    def listdir(self, path: str = ".") -> list[str]: ...
    def listdir_attr(self, path: str = ".") -> list[SFTPAttributes]: ...
    def listdir_iter(self, path: bytes | str = ".", read_aheads: int = 50) -> Iterator[SFTPAttributes]: ...
    def open(self, filename: bytes | str, mode: str = "r", bufsize: int = -1) -> SFTPFile: ...
    file = open
    def remove(self, path: bytes | str) -> None: ...
    unlink = remove
    def rename(self, oldpath: bytes | str, newpath: bytes | str) -> None: ...
    def posix_rename(self, oldpath: bytes | str, newpath: bytes | str) -> None: ...
    def mkdir(self, path: bytes | str, mode: int = 511) -> None: ...
    def rmdir(self, path: bytes | str) -> None: ...
    def stat(self, path: bytes | str) -> SFTPAttributes: ...
    def lstat(self, path: bytes | str) -> SFTPAttributes: ...
    def symlink(self, source: bytes | str, dest: bytes | str) -> None: ...
    def chmod(self, path: bytes | str, mode: int) -> None: ...
    def chown(self, path: bytes | str, uid: int, gid: int) -> None: ...
    def utime(self, path: bytes | str, times: tuple[float, float] | None) -> None: ...
    def truncate(self, path: bytes | str, size: int) -> None: ...
    def readlink(self, path: bytes | str) -> str | None: ...
    def normalize(self, path: bytes | str) -> str: ...
    def chdir(self, path: None | bytes | str = None) -> None: ...
    def getcwd(self) -> str | None: ...
    def putfo(
        self, fl: IO[bytes], remotepath: bytes | str, file_size: int = 0, callback: _Callback | None = None, confirm: bool = True
    ) -> SFTPAttributes: ...
    def put(
        self, localpath: StrOrBytesPath, remotepath: bytes | str, callback: _Callback | None = None, confirm: bool = True
    ) -> SFTPAttributes: ...
    def getfo(
        self,
        remotepath: bytes | str,
        fl: IO[bytes],
        callback: _Callback | None = None,
        prefetch: bool = True,
        max_concurrent_prefetch_requests: int | None = None,
    ) -> int: ...
    def get(
        self,
        remotepath: bytes | str,
        localpath: StrOrBytesPath,
        callback: _Callback | None = None,
        prefetch: bool = True,
        max_concurrent_prefetch_requests: int | None = None,
    ) -> None: ...

class SFTP(SFTPClient): ...
