-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathsession.pyx
813 lines (689 loc) · 29.3 KB
/
session.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
# This file is part of ssh2-python.
# Copyright (C) 2017-2020 Panos Kittenis
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation, version 2.1.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from cpython cimport PyObject_AsFileDescriptor
from libc.stdlib cimport calloc, malloc, free
from libc.string cimport memcpy
from libc.time cimport time_t
from cython.operator cimport dereference as c_dereference
from agent cimport PyAgent, agent_auth, agent_init, init_connect_agent
from channel cimport PyChannel
from exceptions import SessionHostKeyError, KnownHostError, \
PublicKeyInitError, ChannelError
from listener cimport PyListener
from sftp cimport PySFTP
from publickey cimport PyPublicKeySystem
from utils cimport to_bytes, to_str, to_str_len, handle_error_codes
from statinfo cimport StatInfo
from knownhost cimport PyKnownHost
from fileinfo cimport FileInfo
cimport c_ssh2
cimport c_sftp
cimport c_pkey
LIBSSH2_SESSION_BLOCK_INBOUND = c_ssh2.LIBSSH2_SESSION_BLOCK_INBOUND
LIBSSH2_SESSION_BLOCK_OUTBOUND = c_ssh2.LIBSSH2_SESSION_BLOCK_OUTBOUND
LIBSSH2_HOSTKEY_HASH_MD5 = c_ssh2.LIBSSH2_HOSTKEY_HASH_MD5
LIBSSH2_HOSTKEY_HASH_SHA1 = c_ssh2.LIBSSH2_HOSTKEY_HASH_SHA1
LIBSSH2_HOSTKEY_HASH_SHA256 = c_ssh2.LIBSSH2_HOSTKEY_HASH_SHA256
LIBSSH2_HOSTKEY_TYPE_ECDSA_256 = c_ssh2.LIBSSH2_HOSTKEY_TYPE_ECDSA_256
LIBSSH2_HOSTKEY_TYPE_ECDSA_384 = c_ssh2.LIBSSH2_HOSTKEY_TYPE_ECDSA_384
LIBSSH2_HOSTKEY_TYPE_ECDSA_521 = c_ssh2.LIBSSH2_HOSTKEY_TYPE_ECDSA_521
LIBSSH2_HOSTKEY_TYPE_ED25519 = c_ssh2.LIBSSH2_HOSTKEY_TYPE_ED25519
LIBSSH2_HOSTKEY_TYPE_UNKNOWN = c_ssh2.LIBSSH2_HOSTKEY_TYPE_UNKNOWN
LIBSSH2_HOSTKEY_TYPE_RSA = c_ssh2.LIBSSH2_HOSTKEY_TYPE_RSA
LIBSSH2_HOSTKEY_TYPE_DSS = c_ssh2.LIBSSH2_HOSTKEY_TYPE_DSS
cdef class MethodType:
def __cinit__(self, value):
self.value = value
LIBSSH2_METHOD_KEX = MethodType(c_ssh2.LIBSSH2_METHOD_KEX)
LIBSSH2_METHOD_HOSTKEY = MethodType(c_ssh2.LIBSSH2_METHOD_HOSTKEY)
LIBSSH2_METHOD_CRYPT_CS = MethodType(c_ssh2.LIBSSH2_METHOD_CRYPT_CS)
LIBSSH2_METHOD_CRYPT_SC = MethodType(c_ssh2.LIBSSH2_METHOD_CRYPT_SC)
LIBSSH2_METHOD_MAC_CS = MethodType(c_ssh2.LIBSSH2_METHOD_MAC_CS)
LIBSSH2_METHOD_MAC_SC = MethodType(c_ssh2.LIBSSH2_METHOD_MAC_SC)
LIBSSH2_METHOD_COMP_CS = MethodType(c_ssh2.LIBSSH2_METHOD_COMP_CS)
LIBSSH2_METHOD_COMP_SC = MethodType(c_ssh2.LIBSSH2_METHOD_COMP_SC)
LIBSSH2_METHOD_LANG_CS = MethodType(c_ssh2.LIBSSH2_METHOD_LANG_CS)
LIBSSH2_METHOD_LANG_SC = MethodType(c_ssh2.LIBSSH2_METHOD_LANG_SC)
cdef void kbd_callback(const char *name, int name_len,
const char *instruction, int instruction_len,
int num_prompts,
const c_ssh2.LIBSSH2_USERAUTH_KBDINT_PROMPT *prompts,
c_ssh2.LIBSSH2_USERAUTH_KBDINT_RESPONSE *responses,
void **abstract) except *:
py_sess = (<Session>c_dereference(abstract))
if py_sess._kbd_callback is None:
return
cdef list py_prompts = []
for i in range(num_prompts):
prompt_len = prompts[i].length
py_prompts.append(to_str_len(prompts[i].text,prompt_len))
cdef list py_responses = py_sess._kbd_callback(<bytes> name[:name_len], <bytes> instruction[:instruction_len], py_prompts)
cdef bytes response
for i in range(num_prompts):
response = to_bytes(py_responses[i])
_len = len(response)
_buff = <char *>calloc(sizeof(char), _len)
for j in range(_len):
_buff[j] = response[j]
responses[i].text = _buff
responses[i].length = _len
cdef class Session:
"""LibSSH2 Session class providing session functions"""
def __cinit__(self):
self._session = c_ssh2.libssh2_session_init_ex(
NULL, NULL, NULL, <void*> self)
if self._session is NULL:
raise MemoryError
self._sock = 0
self.sock = None
self._kbd_callback = None
def __dealloc__(self):
if self._session is not NULL:
c_ssh2.libssh2_session_free(self._session)
self._session = NULL
def disconnect(self):
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_disconnect(self._session, b"end")
return handle_error_codes(rc)
def handshake(self, sock not None):
"""Perform SSH handshake.
Must be called after Session initialisation."""
cdef int _sock = PyObject_AsFileDescriptor(sock)
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_handshake(self._session, _sock)
self._sock = _sock
self.sock = sock
return handle_error_codes(rc)
def startup(self, sock):
"""Deprecated - use self.handshake"""
cdef int _sock = PyObject_AsFileDescriptor(sock)
cdef int rc
rc = c_ssh2.libssh2_session_startup(self._session, _sock)
return handle_error_codes(rc)
def set_blocking(self, bint blocking):
"""Set session blocking mode on/off.
:param blocking: ``False`` for non-blocking, ``True`` for blocking.
Session default is blocking unless set otherwise.
:type blocking: bool"""
with nogil:
c_ssh2.libssh2_session_set_blocking(
self._session, blocking)
def get_blocking(self):
"""Get session blocking mode enabled True/False.
:rtype: bool"""
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_get_blocking(self._session)
return bool(rc)
def set_timeout(self, long timeout):
"""Set the timeout in milliseconds for how long a blocking
call may wait until the situation is considered an error and
:py:class:`ssh2.error_codes.LIBSSH2_ERROR_TIMEOUT` is returned.
By default or if timeout set is zero, blocking calls do not
time out.
:param timeout: Milliseconds to wait before timeout."""
with nogil:
c_ssh2.libssh2_session_set_timeout(self._session, timeout)
def get_timeout(self):
"""Get current session timeout setting"""
cdef long timeout
with nogil:
timeout = c_ssh2.libssh2_session_get_timeout(self._session)
return timeout
def userauth_authenticated(self):
"""True/False for is user authenticated or not.
:rtype: bool"""
cdef bint rc
with nogil:
rc = c_ssh2.libssh2_userauth_authenticated(self._session)
return bool(rc)
def userauth_list(self, username not None):
"""Retrieve available authentication methods list.
:rtype: list"""
cdef bytes b_username = to_bytes(username)
cdef char *_username = b_username
cdef size_t username_len = len(b_username)
cdef char *_auth
cdef str auth
with nogil:
_auth = c_ssh2.libssh2_userauth_list(
self._session, _username, username_len)
if _auth is NULL:
return
auth = to_str(_auth)
return auth.split(',')
def userauth_publickey_fromfile(self, username not None,
privatekey not None,
passphrase='',
publickey=None):
"""Authenticate with public key from file.
:rtype: int"""
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef bytes b_publickey = to_bytes(publickey) \
if publickey is not None else None
cdef bytes b_privatekey = to_bytes(privatekey)
cdef bytes b_passphrase = to_bytes(passphrase)
cdef char *_username = b_username
cdef char *_publickey = NULL
cdef char *_privatekey = b_privatekey
cdef char *_passphrase = b_passphrase
if b_publickey is not None:
_publickey = b_publickey
with nogil:
rc = c_ssh2.libssh2_userauth_publickey_fromfile(
self._session, _username, _publickey, _privatekey, _passphrase)
return handle_error_codes(rc)
def userauth_publickey(self, username not None,
bytes pubkeydata not None):
"""Perform public key authentication with provided public key data
:param username: User name to authenticate as
:type username: str
:param pubkeydata: Public key data
:type pubkeydata: bytes
:rtype: int"""
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef char *_username = b_username
cdef unsigned char *_pubkeydata = pubkeydata
cdef size_t pubkeydata_len = len(pubkeydata)
with nogil:
rc = c_ssh2.libssh2_userauth_publickey(
self._session, _username, _pubkeydata,
pubkeydata_len, NULL, NULL)
return handle_error_codes(rc)
def userauth_hostbased_fromfile(self,
username not None,
privatekey not None,
hostname not None,
publickey=None,
passphrase=''):
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef bytes b_publickey = to_bytes(publickey) \
if publickey is not None else None
cdef bytes b_privatekey = to_bytes(privatekey)
cdef bytes b_passphrase = to_bytes(passphrase)
cdef bytes b_hostname = to_bytes(hostname)
cdef char *_username = b_username
cdef char *_publickey = NULL
cdef char *_privatekey = b_privatekey
cdef char *_passphrase = b_passphrase
cdef char *_hostname = b_hostname
if b_publickey is not None:
_publickey = b_publickey
with nogil:
rc = c_ssh2.libssh2_userauth_hostbased_fromfile(
self._session, _username, _publickey,
_privatekey, _passphrase, _hostname)
return handle_error_codes(rc)
def userauth_publickey_frommemory(
self, username, bytes privatekeyfiledata,
passphrase='', bytes publickeyfiledata=None):
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef bytes b_passphrase = to_bytes(passphrase)
cdef char *_username = b_username
cdef char *_passphrase = b_passphrase
cdef char *_publickeyfiledata = NULL
cdef char *_privatekeyfiledata = privatekeyfiledata
cdef size_t username_len, privatekeydata_len
cdef size_t pubkeydata_len = 0
username_len, privatekeydata_len = \
len(b_username), len(privatekeyfiledata)
if publickeyfiledata is not None:
_publickeyfiledata = publickeyfiledata
pubkeydata_len = len(publickeyfiledata)
with nogil:
rc = c_ssh2.libssh2_userauth_publickey_frommemory(
self._session, _username, username_len, _publickeyfiledata,
pubkeydata_len, _privatekeyfiledata,
privatekeydata_len, _passphrase)
return handle_error_codes(rc)
def userauth_password(self, username not None, password not None):
"""Perform password authentication
:param username: User name to authenticate.
:type username: str
:param password: Password
:type password: str"""
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef bytes b_password = to_bytes(password)
cdef const char *_username = b_username
cdef const char *_password = b_password
with nogil:
rc = c_ssh2.libssh2_userauth_password(
self._session, _username, _password)
return handle_error_codes(rc)
def userauth_keyboardinteractive(self, username not None,
password not None):
"""Perform keyboard-interactive authentication
:param username: User name to authenticate.
:type username: str
:param password: Password
:type password: str
"""
def passwd(*args,password=password):
return [password]
return self.userauth_keyboardinteractive_callback(username, passwd)
def userauth_keyboardinteractive_callback(self, username not None,
callback not None):
"""Perform keyboard-interactive authentication
:param username: User name to authenticate.
:type username: str
:param password: Password
:type password: str
"""
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef const char *_username = b_username
self._kbd_callback = callback
rc = c_ssh2.libssh2_userauth_keyboard_interactive(
self._session, _username, &kbd_callback)
self._kbd_callback = None
return handle_error_codes(rc)
def agent_init(self):
"""Initialise SSH agent.
:rtype: :py:class:`ssh2.agent.Agent`
"""
cdef c_ssh2.LIBSSH2_AGENT *agent
with nogil:
agent = agent_init(self._session)
return PyAgent(agent, self)
def agent_auth(self, username not None):
"""Convenience function for performing user authentication via SSH Agent.
Initialises, connects to, gets list of identities from and attempts
authentication with each identity from SSH agent.
Note that agent connections cannot be used in non-blocking mode -
clients should call `set_blocking(0)` *after* calling this function.
On completion, or any errors, agent is disconnected and resources freed.
All steps are performed in C space which makes this function perform
better than calling the individual Agent class functions from
Python.
:raises: :py:class:`MemoryError` on error initialising agent
:raises: :py:class:`ssh2.exceptions.AgentConnectionError` on error
connecting to agent
:raises: :py:class:`ssh2.exceptions.AgentListIdentitiesError` on error
getting identities from agent
:raises: :py:class:`ssh2.exceptions.AgentAuthenticationError` on no
successful authentication with all available identities.
:raises: :py:class:`ssh2.exceptions.AgentGetIdentityError` on error
getting known identity from agent
:rtype: None"""
cdef bytes b_username = to_bytes(username)
cdef char *_username = b_username
cdef c_ssh2.LIBSSH2_AGENT *agent = NULL
cdef c_ssh2.libssh2_agent_publickey *identity = NULL
cdef c_ssh2.libssh2_agent_publickey *prev = NULL
agent = init_connect_agent(self._session)
with nogil:
agent_auth(_username, agent)
def open_session(self):
"""Open new channel session.
:rtype: :py:class:`ssh2.channel.Channel`
"""
cdef c_ssh2.LIBSSH2_CHANNEL *channel
with nogil:
channel = c_ssh2.libssh2_channel_open_session(
self._session)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self)
def direct_tcpip_ex(self, host not None, int port,
shost not None, int sport):
cdef c_ssh2.LIBSSH2_CHANNEL *channel
cdef bytes b_host = to_bytes(host)
cdef bytes b_shost = to_bytes(shost)
cdef char *_host = b_host
cdef char *_shost = b_shost
with nogil:
channel = c_ssh2.libssh2_channel_direct_tcpip_ex(
self._session, _host, port, _shost, sport)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self)
def direct_tcpip(self, host not None, int port):
"""Open direct TCP/IP channel to host:port
Channel will be listening on an available open port on client side
as assigned by OS.
"""
cdef c_ssh2.LIBSSH2_CHANNEL *channel
cdef bytes b_host = to_bytes(host)
cdef char *_host = b_host
with nogil:
channel = c_ssh2.libssh2_channel_direct_tcpip(
self._session, _host, port)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self)
def block_directions(self):
"""Get blocked directions for the current session.
From libssh2 documentation:
Can be a combination of:
``ssh2.session.LIBSSH2_SESSION_BLOCK_INBOUND``: Inbound direction
blocked.
``ssh2.session.LIBSSH2_SESSION_BLOCK_OUTBOUND``: Outbound direction
blocked.
Application should wait for data to be available for socket prior to
calling a libssh2 function again. If ``LIBSSH2_SESSION_BLOCK_INBOUND``
is set select should contain the session socket in readfds set.
Correspondingly in case of ``LIBSSH2_SESSION_BLOCK_OUTBOUND`` writefds
set should contain the socket.
:rtype: int"""
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_block_directions(
self._session)
return rc
def forward_listen(self, int port):
"""Create forward listener on port.
:param port: Port to listen on.
:type port: int
:rtype: :py:class:`ssh2.listener.Listener` or None"""
cdef c_ssh2.LIBSSH2_LISTENER *listener
with nogil:
listener = c_ssh2.libssh2_channel_forward_listen(
self._session, port)
if listener is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyListener(listener, self)
def forward_listen_ex(self, int queue_maxsize, host=None, int port=0):
"""
Instruct the remote SSH server to begin listening for inbound
TCP/IP connections. New connections will be queued by the library
until accepted by ``ssh2.channel.Channel.forward_accept``.
:param queue_maxsize: Maximum number of pending connections to queue
before rejecting further attempts.
:type queue_maxsize: int
:param host: Address to bind to on the remote host. Binding
to 0.0.0.0 (default when `None` is passed) will bind to all available
addresses.
:type host: str
:param port: port to bind to on the remote host. When 0 is passed
(the default), the remote host will select the first available
dynamic port.
:type port: int
:returns: (listener, bound_port) tuple where bound_port is the
listen port bound on the remote host. Useful when requesting
dynamic port numbers.
:rtype: (:py:class:`ssh2.listener.Listener`, int)
"""
cdef c_ssh2.LIBSSH2_LISTENER *listener
cdef bytes b_host = None if host is None else to_bytes(host)
cdef char *_host = NULL
if b_host is not None:
_host = b_host
cdef int bound_port = 0
with nogil:
listener = c_ssh2.libssh2_channel_forward_listen_ex(
self._session, _host, port, &bound_port, queue_maxsize)
if listener is NULL:
return (handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session)), 0)
return (PyListener(listener, self), bound_port)
def sftp_init(self):
"""Initialise SFTP channel.
:rtype: :py:class:`ssh2.sftp.SFTP`
"""
cdef c_sftp.LIBSSH2_SFTP *_sftp
with nogil:
_sftp = c_sftp.libssh2_sftp_init(self._session)
if _sftp is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PySFTP(_sftp, self)
def last_error(self, size_t msg_size=1024):
"""Retrieve last error message from libssh2, if any.
Returns empty string on no error message.
:rtype: str
"""
cdef char *_error_msg
cdef bytes msg = b''
cdef int errmsg_len = 0
with nogil:
_error_msg = <char *>malloc(sizeof(char) * msg_size)
c_ssh2.libssh2_session_last_error(
self._session, &_error_msg, &errmsg_len, 1)
try:
if errmsg_len > 0:
msg = _error_msg[:errmsg_len]
return msg
finally:
free(_error_msg)
def last_errno(self):
"""Retrieve last error number from libssh2, if any.
Returns 0 on no last error.
:rtype: int
"""
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_last_errno(
self._session)
return rc
def set_last_error(self, int errcode, errmsg not None):
cdef bytes b_errmsg = to_bytes(errmsg)
cdef char *_errmsg = b_errmsg
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_set_last_error(
self._session, errcode, _errmsg)
return rc
def scp_recv(self, path not None):
"""Receive file via SCP.
Deprecated in favour or recv2 (requires libssh2 >= 1.7).
:param path: File path to receive.
:type path: str
:rtype: tuple(:py:class:`ssh2.channel.Channel`,
:py:class:`ssh2.statinfo.StatInfo`) or None"""
cdef bytes b_path = to_bytes(path)
cdef char *_path = b_path
cdef StatInfo statinfo = StatInfo()
cdef c_ssh2.LIBSSH2_CHANNEL *channel
with nogil:
channel = c_ssh2.libssh2_scp_recv(
self._session, _path, statinfo._stat)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self), statinfo
def scp_recv2(self, path not None):
"""Receive file via SCP.
:param path: File path to receive.
:type path: str
:rtype: tuple(:py:class:`ssh2.channel.Channel`,
:py:class:`ssh2.fileinfo.FileInfo`) or ``None``"""
cdef FileInfo fileinfo = FileInfo()
cdef bytes b_path = to_bytes(path)
cdef char *_path = b_path
cdef c_ssh2.LIBSSH2_CHANNEL *channel
with nogil:
channel = c_ssh2.libssh2_scp_recv2(
self._session, _path, fileinfo._stat)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self), fileinfo
def scp_send(self, path not None, int mode, size_t size):
"""Deprecated in favour of scp_send64. Send file via SCP.
:param path: Local file path to send.
:type path: str
:param mode: File mode.
:type mode: int
:param size: size of file
:type size: int
:rtype: :py:class:`ssh2.channel.Channel`"""
cdef bytes b_path = to_bytes(path)
cdef char *_path = b_path
cdef c_ssh2.LIBSSH2_CHANNEL *channel
with nogil:
channel = c_ssh2.libssh2_scp_send(
self._session, _path, mode, size)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self)
def scp_send64(self, path not None, int mode, c_ssh2.libssh2_uint64_t size,
time_t mtime, time_t atime):
"""Send file via SCP.
:param path: Local file path to send.
:type path: str
:param mode: File mode.
:type mode: int
:param size: size of file
:type size: int
:rtype: :py:class:`ssh2.channel.Channel`"""
cdef bytes b_path = to_bytes(path)
cdef char *_path = b_path
cdef c_ssh2.LIBSSH2_CHANNEL *channel
with nogil:
channel = c_ssh2.libssh2_scp_send64(
self._session, _path, mode, size, mtime, atime)
if channel is NULL:
return handle_error_codes(c_ssh2.libssh2_session_last_errno(
self._session))
return PyChannel(channel, self)
def publickey_init(self):
"""Initialise public key subsystem for managing remote server
public keys"""
cdef c_pkey.LIBSSH2_PUBLICKEY *_pkey
with nogil:
_pkey = c_pkey.libssh2_publickey_init(self._session)
if _pkey is NULL:
raise PublicKeyInitError
return PyPublicKeySystem(_pkey, self)
def hostkey_hash(self, int hash_type):
"""Get computed digest of the remote system's host key.
:param hash_type: One of ``ssh2.session.LIBSSH2_HOSTKEY_HASH_MD5`` or
``ssh2.session.LIBSSH2_HOSTKEY_HASH_SHA1``
:type hash_type: int
:rtype: bytes"""
cdef const char *_hash
cdef bytes b_hash
with nogil:
_hash = c_ssh2.libssh2_hostkey_hash(self._session, hash_type)
if _hash is NULL:
return
b_hash = _hash
return b_hash
def hostkey(self):
"""Get server host key for this session.
Returns key, key_type tuple where key_type is one of
:py:class:`ssh2.session.LIBSSH2_HOSTKEY_TYPE_RSA`,
:py:class:`ssh2.session.LIBSSH2_HOSTKEY_TYPE_DSS`, or
:py:class:`ssh2.session.LIBSSH2_HOSTKEY_TYPE_UNKNOWN`
:rtype: tuple(bytes, int)"""
cdef bytes key = b""
cdef const char *_key
cdef size_t key_len = 0
cdef int key_type = 0
with nogil:
_key = c_ssh2.libssh2_session_hostkey(
self._session, &key_len, &key_type)
if _key is NULL:
raise SessionHostKeyError(
"Error retrieving server host key for session")
key = _key[:key_len]
return key, key_type
def knownhost_init(self):
"""Initialise a collection of known hosts for this session.
:rtype: :py:class:`ssh2.knownhost.KnownHost`"""
cdef c_ssh2.LIBSSH2_KNOWNHOSTS *known_hosts
with nogil:
known_hosts = c_ssh2.libssh2_knownhost_init(
self._session)
if known_hosts is NULL:
raise KnownHostError
return PyKnownHost(self, known_hosts)
def keepalive_config(self, bint want_reply, unsigned interval):
"""
Configure keep alive settings.
:param want_reply: True/False for reply wanted from server on keep
alive messages being sent or not.
:type want_reply: bool
:param interval: Required keep alive interval. Set to ``0`` to disable
keepalives.
:type interval: int"""
with nogil:
c_ssh2.libssh2_keepalive_config(self._session, want_reply, interval)
def keepalive_send(self):
"""Send keepalive.
Returns seconds remaining before next keep alive should be sent.
:rtype: int"""
cdef int seconds = 0
cdef int c_seconds = 0
cdef int rc
with nogil:
rc = c_ssh2.libssh2_keepalive_send(self._session, &c_seconds)
handle_error_codes(rc)
return c_seconds
def supported_algs(self, MethodType method_type):
"""Get supportd algorithms for method type.
:param method_type: Type of method to get
:type method_type: :py:class:`MethodType`
:returns: List of supported algorithms.
:rtype: list(str)
"""
cdef const char **c_algs
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_supported_algs(
self._session, method_type.value, &c_algs)
if rc < 0:
return handle_error_codes(rc)
elif rc == 0:
return []
try:
algs = [to_str(c_algs[i]) for i in range(rc)]
finally:
with nogil:
c_ssh2.libssh2_free(self._session, c_algs)
return algs
def methods(self, MethodType method_type):
"""Get currently active algorithms for method type.
:param method_type: Type of method to get
:type method_type: :py:class:`MethodType`
:rtype: str
"""
with nogil:
methods = c_ssh2.libssh2_session_methods(
self._session, method_type.value)
return to_str(methods)
def method_pref(self, MethodType method_type, prefs not None):
"""Set preference for session method.
See :py:func:`Session.supported_algs` for supported algorithms
for method type.
:param method_type: A supported session method LIBSSH2_METHOD_*
:type method_type: :py:class:`MethodType`
:param prefs: Algorithm preference for method type provided.
:type prefs: str
:rtype: int
:raises: :py:class:`ssh2.exceptions.MethodNotSupported`
on unsupported method preference
"""
cdef bytes b_prefs = to_bytes(prefs)
cdef const char *c_prefs = b_prefs
cdef int rc
with nogil:
rc = c_ssh2.libssh2_session_method_pref(
self._session, method_type.value, c_prefs)
handle_error_codes(rc)
return rc