|
1 | 1 | # kafkaproxy |
2 | | -kafka的代理对象 |
| 2 | + |
| 3 | +kafka生产者和消费者的代理工具. |
| 4 | + |
| 5 | +代理对象用于推迟初始化.我们可以在需要的地方用代理对象的全局变量直接编写逻辑,避免被代理的对象来回在函数间传递. |
| 6 | + |
| 7 | +## 特性 |
| 8 | + |
| 9 | ++ 支持代理`kafka-python`,`aiokafka`和`confluent-kafka`的生产者消费者对象. |
| 10 | ++ 提供统一通用的生产者消费者接口包装 |
| 11 | + |
| 12 | +## 安装 |
| 13 | + |
| 14 | ++ 只安装本项目不安装被代理对象的依赖: `pip install kafkaproxy` |
| 15 | ++ 安装本项目同时确定要代理的对象包为`kafka-python`: `pip install kafkaproxy[kafka]` |
| 16 | ++ 安装本项目同时确定要代理的对象包为`aiokafka`: `pip install kafkaproxy[aio]` |
| 17 | ++ 安装本项目同时确定要代理的对象包为`confluent-kafka`: `pip install kafkaproxy[confluent]` |
| 18 | + |
| 19 | +## 使用 |
| 20 | + |
| 21 | +本项目支持代理3种kafka模块中的对应模块,使用枚举`KafkaType`中的取值在调用方法`initialize_from_addresses`初始化时指定. |
| 22 | +代理对象除了原样代理对象外还提供了生产者和消费者的统一通用接口包装. |
| 23 | +由于对应的方法是动态绑定的,因此如果需要他们的typehints可以用`typing.cast`将代理对象转化为对应的协议对象 |
| 24 | + |
| 25 | ++ 同步接口生产者使用`ProducerProtocol` |
| 26 | ++ 异步接口生产者使用`AioProducerProtocol` |
| 27 | ++ 同步接消费产者使用`ConsumerProtocol` |
| 28 | ++ 异步接消费产者使用`AioConsumerProtocol` |
| 29 | + |
| 30 | +> 代理`kafka-python`或`confluent-kafka`生产者 |
| 31 | +
|
| 32 | +```python |
| 33 | +from kafkaproxy import ProducerProxy, KafkaType, ProducerProtocol |
| 34 | +from typing import cast |
| 35 | +import time |
| 36 | +kafkap = ProducerProxy() |
| 37 | + |
| 38 | + |
| 39 | +def run() -> None: |
| 40 | + p = cast(ProducerProtocol, kafkap) |
| 41 | + with p.mount() as cli: |
| 42 | + for i in range(10): |
| 43 | + cli.publish("topic1", f"send {i}") |
| 44 | + time.sleep(0.1) |
| 45 | + |
| 46 | + |
| 47 | +# kafkap.initialize_from_addresses("localhost:9094", kafka_type=KafkaType.ConfluentKafka, acks="all") |
| 48 | +kafkap.initialize_from_addresses("localhost:9094", kafka_type=KafkaType.Kafka) |
| 49 | +try: |
| 50 | + print("start publishing") |
| 51 | + run() |
| 52 | +finally: |
| 53 | + print("stoped") |
| 54 | +``` |
| 55 | + |
| 56 | +> 代理`kafka-python`或`confluent-kafka`消费者 |
| 57 | +
|
| 58 | +```python |
| 59 | +from kafkaproxy import ConsumerProxy, KafkaType, ConsumerProtocol |
| 60 | +from typing import cast |
| 61 | + |
| 62 | +kafkac = ConsumerProxy() |
| 63 | + |
| 64 | + |
| 65 | +def run() -> None: |
| 66 | + c = cast(ConsumerProtocol, kafkac) |
| 67 | + with c.watch() as g: |
| 68 | + for record in g: |
| 69 | + print(record.value) |
| 70 | + |
| 71 | + |
| 72 | +# kafkac.initialize_from_addresses("localhost:9094", "topic1", group_id="test2", kafka_type=KafkaType.Kafka) |
| 73 | +kafkac.initialize_from_addresses("localhost:9094", "topic1", group_id="test2", kafka_type=KafkaType.ConfluentKafka) |
| 74 | +try: |
| 75 | + print("start watching") |
| 76 | + run() |
| 77 | +finally: |
| 78 | + print("stoped") |
| 79 | + |
| 80 | +``` |
| 81 | + |
| 82 | +> 代理`aiokafka`生产者 |
| 83 | +
|
| 84 | +```python |
| 85 | +import asyncio |
| 86 | +from kafkaproxy import ProducerProxy, KafkaType, AioProducerProtocol |
| 87 | +from typing import cast |
| 88 | + |
| 89 | +kafkap = ProducerProxy() |
| 90 | + |
| 91 | + |
| 92 | +async def run() -> None: |
| 93 | + p = cast(AioProducerProtocol, kafkap) |
| 94 | + async with p.mount() as cli: |
| 95 | + for i in range(10): |
| 96 | + await cli.publish("topic1", f"send {i}") |
| 97 | + await asyncio.sleep(0.1) |
| 98 | + |
| 99 | + |
| 100 | +async def main() -> None: |
| 101 | + kafkap.initialize_from_addresses("localhost:9094", kafka_type=KafkaType.AioKafka, acks="all") |
| 102 | + await run() |
| 103 | + |
| 104 | + |
| 105 | +try: |
| 106 | + print("start watching") |
| 107 | + asyncio.run(main()) |
| 108 | +finally: |
| 109 | + print("stoped") |
| 110 | + |
| 111 | +``` |
| 112 | + |
| 113 | +> 代理`aiokafka`消费者 |
| 114 | +
|
| 115 | +```python |
| 116 | +import asyncio |
| 117 | +from kafkaproxy import ConsumerProxy, KafkaAutoOffsetReset, KafkaType, AioConsumerProtocol |
| 118 | +from typing import cast |
| 119 | + |
| 120 | +kafkac = ConsumerProxy() |
| 121 | + |
| 122 | + |
| 123 | +async def run() -> None: |
| 124 | + c = cast(AioConsumerProtocol, kafkac) |
| 125 | + async with c.watch() as g: |
| 126 | + async for record in g: |
| 127 | + print(record.value) |
| 128 | + |
| 129 | + |
| 130 | +async def main() -> None: |
| 131 | + kafkac.initialize_from_addresses("localhost:9094", "topic1", group_id="test2", kafka_type=KafkaType.AioKafka, auto_offset_reset=KafkaAutoOffsetReset.earliest) |
| 132 | + await run() |
| 133 | + |
| 134 | + |
| 135 | +try: |
| 136 | + print("start watching") |
| 137 | + asyncio.run(main()) |
| 138 | +finally: |
| 139 | + print("stoped") |
| 140 | + |
| 141 | +``` |
0 commit comments