Tornado 的高级功能:处理长连接
liebian365 2024-11-26 05:52 3 浏览 0 评论
Tornado 是一个功能强大的 Python Web 框架和异步网络库,以处理大量同时连接的能力而闻名,这使其成为需要长连接的应用程序的绝佳选择。这些连接在聊天应用程序、实时通知和 IoT 设备等场景中特别有用,在这些场景中,维护客户端和服务器之间的开放通道至关重要。在本博文中,我们将深入探讨有助于处理长连接的 Tornado 高级功能,并提供一些实际演示来说明它们的用法。
了解长连接
长连接是指长时间保持打开状态的连接,允许连续发送和接收数据。这与传统的 HTTP 请求形成对比,后者打开连接、发送数据、接收响应,然后关闭连接。Tornado 的非阻塞异步 I/O 使其特别适合高效处理长连接。
关键概念和组件
在开始演示之前,让我们简要介绍一下处理长连接所必需的一些关键 Tornado 组件和概念:
- Tornado 的 IOLoop:Tornado 中处理非阻塞 I/O 操作的核心事件循环。它负责调度和执行异步任务。
- WebSocketHandler:用于管理 WebSocket 连接的专用 Tornado 处理程序,通常用于长连接。
- PeriodicCallback:Tornado 实用程序,允许您定期在 IOLoop 上运行回调,这对于发送心跳消息等任务很有用。
演示 1:实现基本 WebSocket 服务器
我们将从使用 Tornado 的 WebSocket 服务器的简单演示开始,该服务器允许长连接。
import tornado.ioloop
import tornado.web
import tornado.websocket
class EchoWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
self.write_message("Welcome to the WebSocket server!")
def on_message(self, message):
print(f"Received message: {message}")
self.write_message(f"You said: {message}")
def on_close(self):
print("WebSocket closed")
def make_app():
return tornado.web.Application([
(r"/websocket", EchoWebSocket),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
解释:
- EchoWebSocket 类:处理 WebSocket 连接。建立新连接时调用 open 方法,收到消息时调用 on_message,关闭连接时调用 on_close。
- make_app 函数:创建一个具有 WebSocket 连接单一路由的 Tornado 应用程序。
- IOLoop:启动 Tornado IOLoop 来监听连接。
演示 2:使用 PeriodicCallback 处理心跳
为了保持 WebSocket 连接处于活动状态,通常会发送定期的“heartbeat”消息。Tornado 的 PeriodicCallback 非常适合这项任务。
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado.ioloop import PeriodicCallback
class HeartbeatWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
self.write_message("Connected to Heartbeat WebSocket server!")
self.heartbeat = PeriodicCallback(self.send_heartbeat, 10000)
self.heartbeat.start()
def on_message(self, message):
print(f"Received message: {message}")
self.write_message(f"You said: {message}")
def on_close(self):
print("WebSocket closed")
self.heartbeat.stop()
def send_heartbeat(self):
self.write_message("heartbeat")
def make_app():
return tornado.web.Application([
(r"/heartbeat", HeartbeatWebSocket),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
解释:
- PeriodicCallback:每 10 秒调用一次 send_heartbeat 方法发送“heartbeat”消息,确保连接保持活动状态。
- on_close:连接关闭时停止心跳。
演示 3:使用 Tornado 管理多个客户端
处理长连接时,有效管理多个客户端非常重要。让我们扩展我们的 WebSocket 服务器以向所有连接的客户端广播消息。
import tornado.ioloop
import tornado.web
import tornado.websocket
clients = []
class BroadcastWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
clients.append(self)
self.write_message("Connected to Broadcast WebSocket server!")
def on_message(self, message):
print(f"Received message: {message}")
for client in clients:
client.write_message(f"Broadcast: {message}")
def on_close(self):
print("WebSocket closed")
clients.remove(self)
def make_app():
return tornado.web.Application([
(r"/broadcast", BroadcastWebSocket),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
解释:
- clients list:跟踪所有已连接客户端的全局列表。
- on_message:向所有已连接客户端广播传入消息。
以下是一些额外的演示,深入探讨了 Tornado 中处理长连接方面的更高级方面,重点介绍了背压管理、处理大文件上传以及将 Tornado 与其他异步框架集成等功能。
演示 4:使用 Tornado 管理背压
处理长连接时,尤其是在服务器以高速率向客户端发送数据的情况下,管理背压非常重要。当客户端无法足够快地处理传入数据时,就会发生背压,这可能导致性能下降或连接中断。
在 Tornado 中,您可以通过监视写入缓冲区的大小并相应地暂停/恢复数据流来管理背压。
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import gen
class BackpressureWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
self.write_message("Connected to Backpressure WebSocket server!")
self.stream_data()
@gen.coroutine
def stream_data(self):
for i in range(10000): # Simulate sending a large amount of data
if self.ws_connection is None: # Connection closed
break
yield self.check_backpressure()
self.write_message(f"Message {i}")
yield gen.sleep(0.01) # Simulate processing delay
@gen.coroutine
def check_backpressure(self):
while self.ws_connection is not None and self.ws_connection.write_buffer_size > 1024 * 1024: # 1 MB buffer size limit
print("Backpressure detected, pausing stream...")
yield gen.sleep(0.1)
def on_close(self):
print("WebSocket closed")
def make_app():
return tornado.web.Application([
(r"/backpressure", BackpressureWebSocket),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
解释:
- check_backpressure 方法:监控写入缓冲区的大小,如果缓冲区超过阈值(本例中为 1 MB),则暂停数据流。这可以防止客户端不堪重负。
- gen.coroutine:用于创建一个异步生成器,将控制权交还给 IOLoop,允许其他任务运行。
演示 5:通过 WebSocket 处理大型文件上传
通过 WebSocket 处理大型文件上传需要谨慎管理,以避免阻塞服务器。Tornado 允许高效、无阻塞地处理大型数据流。
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import gen
class FileUploadWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
self.file = open("uploaded_file.bin", "wb")
@gen.coroutine
def on_message(self, message):
print("Received chunk of data")
self.file.write(message) # Write data to file in chunks
yield self.check_backpressure()
@gen.coroutine
def check_backpressure(self):
if self.ws_connection is not None and self.ws_connection.write_buffer_size > 1024 * 1024: # 1 MB buffer size limit
print("Backpressure detected, slowing down file writing...")
yield gen.sleep(0.1)
def on_close(self):
print("WebSocket closed")
self.file.close()
def make_app():
return tornado.web.Application([
(r"/upload", FileUploadWebSocket),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
解释:
- on_message:处理传入的文件数据块并实时将其写入文件。
- check_backpressure:通过监控写入缓冲区大小,确保服务器不会被大数据流淹没。
演示 6:将 Tornado 与 Asyncio 集成以实现长寿命连接
Tornado 可以与 Python 的 asyncio 集成以利用其功能,例如 async/await 语法。通过此集成,您可以将 Tornado 的异步功能与 asyncio 的事件循环相结合,从而构建更复杂的长寿命连接场景。
import tornado.web
import tornado.websocket
import tornado.platform.asyncio
import asyncio
class AsyncioWebSocket(tornado.websocket.WebSocketHandler):
async def open(self):
print("WebSocket opened")
await self.write_message("Connected to Asyncio WebSocket server!")
await self.stream_data()
async def stream_data(self):
for i in range(10000):
if self.ws_connection is None:
break
await asyncio.sleep(0.01) # Simulate async processing
await self.write_message(f"Message {i}")
async def on_message(self, message):
print(f"Received message: {message}")
await self.write_message(f"You said: {message}")
def on_close(self):
print("WebSocket closed")
def make_app():
return tornado.web.Application([
(r"/asyncio", AsyncioWebSocket),
])
if __name__ == "__main__":
tornado.platform.asyncio.AsyncIOMainLoop().install()
app = make_app()
app.listen(8888)
asyncio.get_event_loop().run_forever()
解释:
- AsyncioWebSocket:使用 asyncio 扩展 Tornado 的 WebSocketHandler。open、on_message 和 stream_data 方法都是异步的,可使用 async/await 实现非阻塞操作。
- AsyncIOMainLoop:将 Tornado 的 IOLoop 与 asyncio 的事件循环集成,允许在 Tornado 中无缝使用 asyncio 的功能。
演示 7:使用 Tornado 实现长轮询
长轮询是一种使用 HTTP 模拟长连接的技术。客户端发送请求,服务器将其保持打开状态,直到有新数据可用或发生超时。当 WebSocket 支持有限时,此技术很有用。
import tornado.ioloop
import tornado.web
from tornado import gen
class LongPollingHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
self.write("Waiting for data...")
self.flush()
# Simulate waiting for data
yield gen.sleep(10) # Wait for 10 seconds
self.write("New data available!")
self.finish()
def make_app():
return tornado.web.Application([
(r"/longpoll", LongPollingHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
解释:
- LongPollingHandler:处理长轮询请求。 get 方法模拟响应前的延迟(例如,等待新数据可用)。
- flush():将初始响应发送到客户端并保持连接打开,直到调用 finish() 方法。
写在最后:
- 这些高级演示展示了如何使用 Tornado 来管理涉及长寿命连接的更复杂场景,例如处理背压、大文件上传、与 asyncio 集成,甚至实现长轮询。Tornado 的灵活性和效率使其成为构建需要持久、高性能连接的可扩展应用程序的可靠选择。
- 处理长连接是 Tornado 的优势之一,使其成为构建实时、响应式 Web 应用程序的强大工具。在本博文中,我们介绍了 Tornado 中长连接的基本知识,并提供了实现 WebSocket 服务器、发送心跳消息和管理多个客户端的演示等。这些示例应为构建需要持久连接的更复杂应用程序奠定坚实的基础。请随意尝试这些示例并扩展它们以满足您的特定需求。无论您构建的是聊天应用程序、实时通知系统还是复杂的物联网平台,Tornado 的长连接功能都可以帮助您高效地实现目标。
相关推荐
- 快递查询教程,批量查询物流,一键管理快递
-
作为商家,每天需要查询许许多多的快递单号,面对不同的快递公司,有没有简单一点的物流查询方法呢?小编的回答当然是有的,下面随小编一起来试试这个新技巧。需要哪些工具?安装一个快递批量查询高手快递单号怎么快...
- 一键自动查询所有快递的物流信息 支持圆通、韵达等多家快递
-
对于各位商家来说拥有一个好的快递软件,能够有效的提高自己的工作效率,在管理快递单号的时候都需要对单号进行表格整理,那怎么样能够快速的查询所有单号信息,并自动生成表格呢?1、其实方法很简单,我们不需要一...
- 快递查询单号查询,怎么查物流到哪了
-
输入单号怎么查快递到哪里去了呢?今天小编给大家分享一个新的技巧,它支持多家快递,一次能查询多个单号物流,还可对查询到的物流进行分析、筛选以及导出,下面一起来试试。需要哪些工具?安装一个快递批量查询高手...
- 3分钟查询物流,教你一键批量查询全部物流信息
-
很多朋友在问,如何在短时间内把单号的物流信息查询出来,查询完成后筛选已签收件、筛选未签收件,今天小编就分享一款物流查询神器,感兴趣的朋友接着往下看。第一步,运行【快递批量查询高手】在主界面中点击【添...
- 快递单号查询,一次性查询全部物流信息
-
现在各种快递的查询方式,各有各的好,各有各的劣,总的来说,还是有比较方便的。今天小编就给大家分享一个新的技巧,支持多家快递,一次能查询多个单号的物流,还能对查询到的物流进行分析、筛选以及导出,下面一起...
- 快递查询工具,批量查询多个快递快递单号的物流状态、签收时间
-
最近有朋友在问,怎么快速查询单号的物流信息呢?除了官网,还有没有更简单的方法呢?小编的回答当然是有的,下面一起来看看。需要哪些工具?安装一个快递批量查询高手多个京东的快递单号怎么快速查询?进入快递批量...
- 快递查询软件,自动识别查询快递单号查询方法
-
当你拥有多个快递单号的时候,该如何快速查询物流信息?比如单号没有快递公司时,又该如何自动识别再去查询呢?不知道如何操作的宝贝们,下面随小编一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号若干...
- 教你怎样查询快递查询单号并保存物流信息
-
商家发货,快递揽收后,一般会直接手动复制到官网上一个个查询物流,那么久而久之,就会觉得查询变得特别繁琐,今天小编给大家分享一个新的技巧,下面一起来试试。教程之前,我们来预览一下用快递批量查询高手...
- 简单几步骤查询所有快递物流信息
-
在高峰期订单量大的时候,可能需要一双手当十双手去查询快递物流,但是由于逐一去查询,效率极低,追踪困难。那么今天小编给大家分享一个新的技巧,一次能查询多个快递单号的物流,下面一起来学习一下,希望能给大家...
- 物流单号查询,如何查询快递信息,按最后更新时间搜索需要的单号
-
最近有很多朋友在问,如何通过快递单号查询物流信息,并按最后更新时间搜索出需要的单号呢?下面随小编一起来试试吧。需要哪些工具?安装一个快递批量查询高手快递单号若干怎么快速查询?运行【快递批量查询高手】...
- 连续保存新单号功能解析,导入单号查询并自动识别批量查快递信息
-
快递查询已经成为我们日常生活中不可或缺的一部分。然而,面对海量的快递单号,如何高效、准确地查询每一个快递的物流信息,成为了许多人头疼的问题。幸运的是,随着科技的进步,一款名为“快递批量查询高手”的软件...
- 快递查询教程,快递单号查询,筛选更新量为1的单号
-
最近有很多朋友在问,怎么快速查询快递单号的物流,并筛选出更新量为1的单号呢?今天小编给大家分享一个新方法,一起来试试吧。需要哪些工具?安装一个快递批量查询高手多个快递单号怎么快速查询?运行【快递批量查...
- 掌握批量查询快递动态的技巧,一键查找无信息记录的两种方法解析
-
在快节奏的商业环境中,高效的物流查询是确保业务顺畅运行的关键。作为快递查询达人,我深知时间的宝贵,因此,今天我将向大家介绍一款强大的工具——快递批量查询高手软件。这款软件能够帮助你批量查询快递动态,一...
- 从复杂到简单的单号查询,一键清除单号中的符号并批量查快递信息
-
在繁忙的商务与日常生活中,快递查询已成为不可或缺的一环。然而,面对海量的单号,逐一查询不仅耗时费力,还容易出错。现在,有了快递批量查询高手软件,一切变得简单明了。只需一键,即可搞定单号查询,一键处理单...
- 物流单号查询,在哪里查询快递
-
如果在快递单号多的情况,你还在一个个复制粘贴到官网上手动查询,是一件非常麻烦的事情。于是乎今天小编给大家分享一个新的技巧,下面一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号怎么快速查询?...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)