A Python SDK by LUCIT to use the Bybit Websocket API`s (live+testnet) in a simple, fast, flexible, robust and fully-featured way.
The core functions work. Websocket connections to public endpoints can be established and are stable. (No long-term tests!)
If you would like to take part in the test, please contact us in the chat!
Description | Installation | How To | Documentation | Examples | Change Log | Wiki | Social | Notifications | Bugs | Contributing | Disclaimer | Commercial Support
A Python SDK by LUCIT to use the Bybit Websocket API`s (live+testnet) in a simple, fast, flexible, robust and fully-featured way.
Part of 'UNICORN Trading Suite'.
Get help with the integration of the UNICORN Trading Suite
modules!
To run modules of the UNICORN Trading Suite you need a valid license!
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
bybit_wsm = BybitWebSocketApiManager(exchange="bybit.com")
bybit_wsm.create_stream(endpoint="public/linear", channels=['kline.1'], markets=['btcusdt', 'bnbbtc', 'ethbtc'])
while True:
oldest_data_from_stream_buffer = bybit_wsm.pop_stream_data_from_stream_buffer()
if oldest_data_from_stream_buffer:
print(oldest_data_from_stream_buffer)
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
def process_new_receives(stream_data):
print(str(stream_data))
bybit_wsm = BybitWebSocketApiManager(exchange="bybit.com")
bybit_wsm.create_stream(endpoint="public/linear", channels=['kline.1m'],
markets=['btcusdt', 'bnbbtc', 'ethbtc'],
process_stream_data=process_new_receives)
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
import asyncio
async def process_new_receives(stream_data):
print(stream_data)
await asyncio.sleep(1)
bybit_wsm = BybitWebSocketApiManager()
bybit_wsm.create_stream(endpoint="public/linear", channels=['kline_1m'],
markets=['btcusdt', 'bnbbtc', 'ethbtc'],
process_stream_data_async=process_new_receives)
All the methods of data collection presented have their own advantages and disadvantages. However, this is the generally recommended method for processing data from streams.
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
import asyncio
async def main():
async def process_asyncio_queue(stream_id=None):
print(f"Start processing the data from stream '{bybit_wsm.get_stream_label(stream_id)}':")
while bybit_wsm.is_stop_request(stream_id) is False:
data = await bybit_wsm.get_stream_data_from_asyncio_queue(stream_id)
print(data)
bybit_wsm.asyncio_queue_task_done(stream_id)
bybit_wsm.create_stream(endpoint="public/linear",
channels=['trade'],
markets=['ethbtc', 'btcusdt'],
stream_label="KLINE_1m",
process_asyncio_queue=process_asyncio_queue)
while not bybit_wsm.is_manager_stopping():
await asyncio.sleep(1)
with BybitWebSocketApiManager(exchange='bybit.com') as bybit_wsm:
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\r\nGracefully stopping ...")
except Exception as e:
print(f"\r\nERROR: {e}\r\nGracefully stopping ...")
Basically that's it, but there are more options.
markets =
channels =
bybit_wsm.subscribe_to_stream(stream_id=stream_id, channels=channels, markets=markets)
bybit_wsm.unsubscribe_from_stream(stream_id=stream_id, markets=markets)
bybit_wsm.unsubscribe_from_stream(stream_id=stream_id, channels=channels)
When you instantiate UNICORN Bybit Websocket API with with
, bybit_wsm.stop_manager()
is automatically executed upon exiting the with
-block.
with BybitWebSocketApiManager() as bybit_wsm:
bybit_wsm.create_stream(channels="kline.1", markets="btcusdt", stream_label="KLINE_1m")
Without with
, you must explicitly execute bybit_wsm.stop_manager()
yourself.
bybit_wsm.stop_manager()
Usually you want to know when a stream is working and when it is not. This can be useful to know that your own system is currently "blind" and you may want to close open positions to be on the safe side, know that indicators will now provide incorrect values or that you have to reload the missing data via REST as an alternative.
For this purpose, the UNICORN Bybit WebSocket API provides so-called
stream_signals
,
which are used to tell your code in real time when a stream is connected, when it received its first data record, when
it was disconnected and stopped, and when the stream cannot be restored.
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
import time
def process_stream_signals(signal_type=None, stream_id=None, data_record=None, error_msg=None):
print(f"Received stream_signal for stream '{bybit_wsm.get_stream_label(stream_id=stream_id)}': "
f"{signal_type} - {stream_id} - {data_record} - {error_msg}")
with BybitWebSocketApiManager(process_stream_signals=process_stream_signals) as bybit_wsm:
bybit_wsm.create_stream(channels="trade", markets="btcusdt", stream_label="TRADES")
time.sleep(2)
print(f"Waiting 5 seconds and then stop the stream ...")
time.sleep(5)
Discover even more possibilities, use this script to stream everything from "bybit.com" or try our examples!
This should be known by everyone using this lib:
The Python package UNICORN Bybit WebSocket API provides an API to the Bybit Websocket API`s of Bybit (+Testnet).
Fully managed websockets and 100% auto-reconnect! Also handles maintenance windows!
No memory leaks from Python version 3.7 to 3.12!
The full UTS stack is delivered as a compiled C extension for maximum performance.
| Exchange | Exchange string | WS | WS API |
|--------------------------------------------|---------------------|------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
| Bybit | bybit.com
| | |
| Bybit Testnet | bybit.com-testnet
| | |
Streams are processing asynchronous/concurrent (Python asyncio) and each stream is started in a separate thread, so
you don't need to deal with asyncio in your code! But you can consume with
await
, if you want!
Supports subscribe/unsubscribe on all exchanges! (Take a look to the max supported subscriptions per stream in the endpoint configuration overview!)
UNICORN Bybit WebSocket API respects Bybit's API guidelines and protects you from avoidable reconnects and bans.
Support for multiple private !userData
streams with different api_key
and api_secret
.
(examplemultipleuserdata_streams.py)
Pick up the received data from the stream_buffer
(FIFO or LIFO) -
if you can not store your data in cause of a temporary technical issue, you can
kick back the data to the stream_buffer
which stores the receives in the RAM till you are able to process the data in the normal way again.
Learn more!
Use separate stream_buffers
for
specific streams
or
users!
Watch the stream_signals
to receive CONNECT
, FIRST_RECEIVED_DATA
, DISCONNECT
, STOP
and
STREAM_UNREPAIRABLE
signals from your streams! Learn more!
Get the received data unchanged as received, as Python dictionary or converted with
UnicornFy into well-formed Python dictionaries. Use the output
parameter of
create_stream()
to control the output format.
Helpful management features like
clear_asyncio_queue()
,
clear_stream_buffer()
,
get_bybit_api_status()
,
get_current_receiving_speed()
,
get_errors_from_endpoints()
,
get_limit_of_subscriptions_per_stream()
,
get_request_id()
,
get_result_by_request_id()
,
get_results_from_endpoints()
,
get_stream_buffer_length()
,
get_stream_info()
,
get_stream_list()
,
get_stream_id_by_label()
,
get_stream_statistic()
,
get_stream_subscriptions()
,
get_version()
,
is_update_available()
,
get_stream_data_from_asyncio_queue()
,
pop_stream_data_from_stream_buffer()
,
print_summary()
,
replace_stream()
,
set_stream_label()
,
set_ringbuffer_error_max_size()
,
subscribe_to_stream()
,
stop_stream()
,
unsubscribe_from_stream()
,
wait_till_stream_has_started()
and many more! Explore them here.
Monitor the status of the created BybitWebSocketApiManager()
instance within your code with
get_monitoring_status_plain()
and specific streams with
get_stream_info()
.
Available as a package via pip
and conda
as precompiled C extension with stub files for improved Intellisense
functions and source code for easier debugging of the source code. To the installation.
Integration of test cases and examples.
Customizable base URL.
Socks5 Proxy support:
bybit_wsm = BybitWebSocketApiManager(exchange="bybit.com", socks5_proxy_server="127.0.0.1:9050")
conda config --add channels conda-forge
conda config --add channels lucit
conda install -c lucit unicorn-bybit-websocket-api
`conda update -c lucit unicorn-bybit-websocket-api`
Run in bash:
`pip install https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/archive/$(curl -s https://api.github.com/repos/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/releases/latest | grep -oP '"tag_name": "\K(.*)(?=")').tar.gz --upgrade`
Use the below command with the version (such as 0.1.0) you determined
[here](https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/releases/latest):
`pip install https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/archive/0.1.0.tar.gz --upgrade`
This is not a release version and can not be considered to be stable!
`pip install https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/tarball/master --upgrade`
[https://unicorn-bybit-websocket-api.docs.lucit.tech/changelog.html](https://unicorn-bybit-websocket-api.docs.lucit.tech/changelog.html)
[https://www.lucit.tech/unicorn-bybit-websocket-api.html](https://www.lucit.tech/unicorn-bybit-websocket-api.html)
[https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/wiki](https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/wiki)
List of planned features - click if you need one of them or suggest a new feature!
Before you report a bug, try the latest release. If the issue still exists, provide the error trace, OS and Python version and explain how to reproduce the error. A demo script is appreciated.
If you don't find an issue related to your topic, please open a new issue!
UNICORN Bybit WebSocket API is an open source project which welcomes contributions which can be anything from simple documentation fixes and reporting dead links to new features. To contribute follow this guide.
We open source!
This project is for informational purposes only. You should not construe this information or any other material as legal, tax, investment, financial or other advice. Nothing contained herein constitutes a solicitation, recommendation, endorsement or offer by us or any third party provider to buy or sell any securities or other financial instruments in this or any other jurisdiction in which such solicitation or offer would be unlawful under the securities laws of such jurisdiction.
Under no circumstances will we be responsible or liable for any claims, damages, losses, expenses, costs or liabilities of any kind, including but not limited to direct or indirect damages for loss of profits.
Do you need a developer, operator or consultant? Contact us for a non-binding initial consultation!