java与python之间的混合开发
java與python之間的混合開(kāi)發(fā)
1. 項(xiàng)目需求
使用java進(jìn)行進(jìn)行數(shù)據(jù)庫(kù)的訪問(wèn),并對(duì)查詢出的數(shù)據(jù)進(jìn)行的數(shù)據(jù)清理,使用python進(jìn)行無(wú)監(jiān)督分類
應(yīng)用場(chǎng)景:
. 整個(gè)項(xiàng)目是BS架構(gòu),基于SpringBoot2.0
. 業(yè)務(wù)端主要是通過(guò)Java來(lái)實(shí)現(xiàn)后臺(tái)與數(shù)據(jù)庫(kù)的連接與交互
. 算法端主要通過(guò)python實(shí)現(xiàn)
實(shí)現(xiàn)的目標(biāo):
2.方案
可以通過(guò)udp或者tcp協(xié)議來(lái)傳遞數(shù)據(jù)
缺點(diǎn):a. 不能異步通信;b.安全性不高;c.調(diào)試麻煩;d.數(shù)據(jù)容易丟失
通過(guò)網(wǎng)絡(luò)接口(Restful)的方式,調(diào)用python服務(wù)(通過(guò)http的POST請(qǐng)求中的Body將數(shù)據(jù)傳入,沒(méi)有測(cè)試過(guò)數(shù)據(jù)量)
通過(guò)消息中間鍵,來(lái)作為數(shù)據(jù)傳遞和消息傳遞的媒介,通過(guò)隊(duì)列的方式來(lái)傳遞數(shù)據(jù),實(shí)現(xiàn)異步通信,并且通過(guò)中間鍵作為中介有管理系統(tǒng),安全性高,有對(duì)應(yīng)的管理界面調(diào)試比較簡(jiǎn)單,能夠查看推送的數(shù)據(jù)是什么;使用隊(duì)列來(lái)作為日志信息的輸入和輸出;
-
最終確定的方案:選用方案二
使用消息中間鍵的好處: -
數(shù)據(jù)不容易丟
-
能夠?qū)崿F(xiàn)異步通信
異步方式指消息發(fā)送方在發(fā)送消息時(shí)不必知道接收方的狀態(tài),更無(wú)需等待接收方的回復(fù),而接收方在收到消息時(shí)也不必知道發(fā)送方的目前狀態(tài),更無(wú)需進(jìn)行同步的消息處理,它們之間的連接完全是松耦合的,通信是非阻塞的,這種異步通信方式是由消息中間件中的消息隊(duì)列及其服務(wù)機(jī)制保障的
-
能夠消峰(通過(guò)隊(duì)列的形式,防止一直訪問(wèn)服務(wù))
用消息隊(duì)列來(lái)緩沖瞬時(shí)流量,把同步的直接調(diào)用轉(zhuǎn)換成異步的間接推送,中間通過(guò)一個(gè)隊(duì)列在一端承接瞬時(shí)的流量洪峰,在另一端平滑地將消息推送出去。
3.技術(shù)選型
Java端使用springboot2.0框架來(lái)搭建服務(wù),使用MyBattisPlus操作數(shù)據(jù)庫(kù)
消息中間鍵:本文針對(duì)的數(shù)據(jù)量不算特別大,也不需要很高的吞吐量,所以選用穩(wěn)定且功能完備,spring集成的ActiveMQ
python端使用stomp消息傳輸框架來(lái)對(duì)ActiveMq的消息隊(duì)列進(jìn)行監(jiān)督
4. 代碼實(shí)現(xiàn)
SpringBoot2.0集成ActiveMq
添加pom文件
<!--ActiveMq--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>1.5.0.RELEASE</version> </dependency> <!--消息隊(duì)列連接池--> <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.0</version> </dependency>構(gòu)建一個(gè)生產(chǎn)者服務(wù)
java和ActiveMq相關(guān)的配置文件
- 通過(guò)yml文件配置
- config配置,注意**@Bean**注解的是對(duì)象的初始化函數(shù),后面使用Autoware注入的時(shí)候,會(huì)自動(dòng)的調(diào)用這個(gè)初始化函數(shù),生成對(duì)應(yīng)的對(duì)象
- 生產(chǎn)消息
- 主函數(shù),啟動(dòng)服務(wù)
構(gòu)建一個(gè)消費(fèi)者服務(wù)
-
yml配置和生產(chǎn)者一樣
-
config配置(無(wú),只需要指定需要監(jiān)聽(tīng)得隊(duì)列即可)
-
注冊(cè)消費(fèi)者監(jiān)聽(tīng)消息隊(duì)列
package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;import javax.jms.JMSException; import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component public class Queue_Consumer {//監(jiān)聽(tīng)注冊(cè),指定需要監(jiān)聽(tīng)得隊(duì)列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消費(fèi)者收到消息***"+textMessage);} }注意在這個(gè)過(guò)程中,python通過(guò)stomp協(xié)議在進(jìn)行隊(duì)列消息得傳輸過(guò)程中使用的二進(jìn)制流數(shù)進(jìn)行傳輸?shù)?#xff0c;所以,在java端接收python返回的數(shù)據(jù)的時(shí)候,需要將二進(jìn)制流先轉(zhuǎn)換為字符串,所以此處不能使用textmessage來(lái)定義接收的消息,而是使用BytesMessage來(lái)定義接收到的消息
package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;import javax.jms.JMSException; import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component public class Queue_Consumer {//監(jiān)聽(tīng)注冊(cè),指定需要監(jiān)聽(tīng)得隊(duì)列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消費(fèi)者收到消息***"+textMessage);} }
python端往隊(duì)列中發(fā)送消息和監(jiān)聽(tīng)隊(duì)列中的消息
使用stomp消息傳輸協(xié)議
在進(jìn)行消息消費(fèi)的時(shí)候,采用兩種模式
-
訓(xùn)練模式,只消費(fèi)對(duì)應(yīng)的隊(duì)列中最新的消息
-
推斷模式,將隊(duì)列中的數(shù)據(jù)來(lái)一條處理一條
4.傳輸?shù)南⒌母袷?/h3>
- java生產(chǎn)者生產(chǎn)消息的格式如下:
{"LD": {"LD":["PL":300,"JD":30,"WD":30]},"SX": {"SX":["PL":345,"JD":80,"WD":65]}
}
數(shù)據(jù)以JSON字符串的形式進(jìn)行傳輸
-
Java端接收數(shù)據(jù)并解析數(shù)據(jù)的方式
消費(fèi)者的代碼
接收的數(shù)據(jù)格式為BytesMessage
@JmsListener(destination = "activemq-queue-result")@Transactionalpublic void recieve(BytesMessage bytesMessage) throws JMSException {byte[] bys = null;//解析二進(jìn)制數(shù)據(jù)為字符串?dāng)?shù)據(jù)bys = new byte[(int) bytesMessage.getBodyLength()];bytesMessage.readBytes(bys);Gson gson = new Gson();//將數(shù)據(jù)轉(zhuǎn)換為指定格式的類,Map.class或者某個(gè)結(jié)構(gòu)體Map map = gson.fromJson(new String(bys),Map.class);System.out.println("***消費(fèi)者收到消息***"+map.toString());System.out.println("***消費(fèi)者處理消息結(jié)果***------>>" + result);}生產(chǎn)者業(yè)務(wù)代碼
業(yè)務(wù)實(shí)現(xiàn),通過(guò)yaml的配置,來(lái)找到需要加載的表名和字段,根據(jù)約定的數(shù)據(jù)格式,轉(zhuǎn)換為json發(fā)送給MQ消息隊(duì)列
工具:Yaml2Gson,用來(lái)將yaml文件轉(zhuǎn)換為gson格式()、Gson
public void send2Mq(String queryTable) {Map featureMap = new HashMap();Map configMap = (Map) Yaml2Gson.yamlToMap(zcYml).get(queryTable); // System.out.println(configMap.keySet().toString());featureMap.put("config",configMap );StringBuilder conditionBuilder = new StringBuilder();String features = configMap.keySet().toString();StringBuilder columnsBuilder = new StringBuilder();//拼湊出各個(gè)列的查詢//查詢的特征項(xiàng)為空的,依據(jù)其是否為連續(xù)值為其設(shè)定初始值None or 0for(String feature: features.substring(1,features.length()-1).split(", ")){Map featsMap = (Map)configMap.get(feature);if((Integer)featsMap.get("isContinuous") == 0) columnsBuilder.append("nvl(to_char("+feature+"),'None') "+feature);else columnsBuilder.append("nvl("+feature+",0) "+feature);conditionBuilder.append(feature+" is not null or ");columnsBuilder.append(", ");}String columns = columnsBuilder.substring(0,columnsBuilder.length()-2);String condition = conditionBuilder.substring(0,conditionBuilder.length()-3);List mess = combineQueryMapper.getFeatures(columns,queryTable,condition);//featureMap.put("data",mess);Map resMap = new HashMap();resMap.put(queryTable,featureMap);//數(shù)據(jù)格式如下produceQueue.produceTrainMsg(new Gson().toJson(resMap));}
5.python解析數(shù)據(jù)的方式
如果是二進(jìn)制的數(shù)據(jù)流就直接傳輸,如果是其他形式的數(shù)據(jù),通過(guò)json.dumps將數(shù)據(jù)轉(zhuǎn)為json字符串進(jìn)行傳輸,并在java中進(jìn)行解析
if not isinstance(msg, bytes):msg_json = json.dumps(msg)# 二進(jìn)制流直接傳輸? else:msg_json = msg con.connect(username='admin', passcode='admin', wait=True) ##通過(guò)這個(gè)函數(shù)發(fā)送數(shù)據(jù)的時(shí)候,是自動(dòng)會(huì)轉(zhuǎn)換為二進(jìn)制數(shù)據(jù)流的(轉(zhuǎn)換為二進(jìn)制數(shù)據(jù)流的json格式的字符串) con.send(destination=queue_name, body=msg_json, content_type='text/plain')6.總結(jié)
在不同架構(gòu)的代碼之間傳輸傳遞不同結(jié)構(gòu)的結(jié)構(gòu)體的數(shù)據(jù),使用JSON字符串能夠最大程度的保留數(shù)據(jù)的原本形式。配合使用消息中間鍵通過(guò)數(shù)據(jù)流就能夠很好的將兩種不同的語(yǔ)言進(jìn)行組合使用,發(fā)揮他們各自的優(yōu)勢(shì),并且規(guī)定好特定的數(shù)據(jù)格式之后,還能夠很容易的將兩塊的功能進(jìn)行松耦合并且達(dá)到不同模塊復(fù)用的目的。同時(shí)在使用的過(guò)程中一定要注意處理好數(shù)據(jù)的類型和數(shù)據(jù)為空的情況(根據(jù)實(shí)際的數(shù)據(jù)處理業(yè)務(wù)來(lái)將為空的數(shù)據(jù)填寫入合適的值),無(wú)論是python還是java,因?yàn)閚ull類型一般都是使用object這種大的類來(lái)向下兼容的,傳遞過(guò)來(lái)的數(shù)字都被轉(zhuǎn)換為object類型的數(shù)據(jù)。最后第一次完全原創(chuàng)的寫一篇自己的開(kāi)發(fā)總結(jié),希望大家能夠留言指正以及私信討論!
總結(jié)
以上是生活随笔為你收集整理的java与python之间的混合开发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: git 报错
- 下一篇: 用Python实现电子邮件接收程序(PO