wxpython-通过request远程下载网络zip文件,并解压安装文件

作者阿里云代理 文章分类 分类:linux图文教程 阅读次数 已被围观 720
  • 明确需求(升级程序)

1.通过wxpython,产生一个窗体,窗体上有一段[文字标签],一个[进度条],一个[开始按钮]。

2.点击【开始按钮】,下载网络资源文件http://example.cn/test.zip。进度条和文字标签同时显示百分比

3.下载完成后,解压到指定目录。如果指定目录下有文件,则覆盖掉。

  • 设计界面

image.png

  • 程序关键点

1.wxpython用的进度条控件是wx.guage.定义如下:

self.gauge_1 = wx.Gauge(self, wx.ID_ANY, 100, style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)

设置进度条的方法如下:msg填的是数字

self.download_gauge.SetValue(msg)

2.因为下载时间长,所以需要在主线程外再启用一个线程下载,避免程序假死。

3.通过request.Session().get方法下载比request.get下载要快

requests.Session().get(url, verify=False, stream=True)

4.通过stream的形式可以获得下载进度。如下,message是下载进度,例如10%,message为10

for chunk in response.iter_content(chunk_size=512):  if chunk:  code.write(chunk)  code.flush()  i = i + 512  number = int(i)  message = number * 100 / total_length

5.Zipfile解压文件,如果解压目录下有同名文件,则会直接覆盖掉

azip = zipfile.ZipFile(zip_file_path) azip.extractall(path=unzip_to_path)
  • 完整代码(frame窗体和event事件)

updateOTAFrame.py(窗体文件,用wxglade创建)

#!/usr/bin/env python # -*- coding: UTF-8 -*- # # generated by wxGlade 0.9.0pre on Thu Sep 06 09:41:05 2018 # import wx # begin wxGlade: dependencies # end wxGlade # begin wxGlade: extracode # end wxGlade class MyFrame(wx.Frame):  def __init__(self, *args, **kwds):  # begin wxGlade: MyFrame.__init__  kwds["style"] = kwds.get("style", 0) | wx.DEFAULT_FRAME_STYLE  wx.Frame.__init__(self, *args, **kwds)  self.SetSize((475, 137))  self.label_1 = wx.StaticText(self, wx.ID_ANY, "start")  self.gauge_1 = wx.Gauge(self, wx.ID_ANY, 100, style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)  self.button_1 = wx.Button(self, wx.ID_ANY, "start")  self.__set_properties()  self.__do_layout()  # end wxGlade  def __set_properties(self):  # begin wxGlade: MyFrame.__set_properties  self.SetTitle(u"\u5347\u7ea7\u7a0b\u5e8f")  # end wxGlade  def __do_layout(self):  # begin wxGlade: MyFrame.__do_layout  sizer_1 = wx.BoxSizer(wx.VERTICAL)  sizer_2 = wx.BoxSizer(wx.VERTICAL)  sizer_2.Add(self.label_1, 1, 0, 0)  sizer_2.Add(self.gauge_1, 1, wx.EXPAND, 0)  sizer_2.Add(self.button_1, 1, wx.ALIGN_CENTER, 0)  sizer_1.Add(sizer_2, 1, wx.EXPAND, 0)  self.SetSizer(sizer_1)  self.Layout()  # end wxGlade # end of class MyFrame

updateOTAevent.py(事件文件,和窗体文件分开)

# -*- coding: UTF-8 -*- import os import zipfile from threading import Thread import requests import wx from wx.lib.pubsub import pub from view.window import updateOTAFrame class MyApp(wx.App):  def OnInit(self):  self.frame = updateOTAFrame.MyFrame(None, wx.ID_ANY, "")  self.frame.CenterOnScreen()  self.download_gauge = self.frame.gauge_1  self.start_button = self.frame.button_1  self.frame.timer = wx.Timer(self.frame)  # 创建定时器  # 绑定一个定时器事件,wxpython存在bug,不设定定时器,pub功能不会正常启用  self.frame.Bind(wx.EVT_TIMER, self.on_timer, self.frame.timer)  self.frame.Show()  self.start_button.Bind(wx.EVT_BUTTON, self.start_event)  pub.subscribe(self.update_ota, "ota_topic")  pub.subscribe(self.download_text_topic, "download_text_topic")  self.download_text = self.frame.label_1  self.download_text.SetLabelText("点击开始升级按钮,即刻开始升级")  self.start_button.SetLabelText("开始升级")  pub.subscribe(self.close_frame, "close_download_topic")  return True  # 下载完成后,关闭窗口  def close_frame(self, msg):  self.frame.Destroy()  # 开始下载按钮事件  def start_event(self, event):  self.start_button.SetLabelText("正在升级")  self.download_text.SetLabelText("正在下载升级包,请不要关闭程序,目前进度:0%")  self.start_button.Disable()  GuageThread()  event.Skip()  # 控制下载的时候的文字  def download_text_topic(self, msg):  if msg < 100:  self.download_text.SetLabelText("正在下载升级包,请不要关闭程序,目前进度:" + str(msg) + "%")  else:  self.download_text.SetLabelText('下载成功,现在开始解压,请耐心等待大于10秒')  self.start_button.SetLabelText('正在解压')  # 控制下载的进度条  def update_ota(self, msg):  if msg < 100:  # 如果是数字,说明线程正在执行,显示数字  self.download_gauge.SetValue(msg)  else:  self.download_gauge.SetValue(msg)  def on_timer(self, evt):  # 定时执行检查网络状态  pass # 另外启动一个线程来控制进度条,不然程序会假死 class GuageThread(Thread):  def __init__(self):  # 线程实例化时立即启动  Thread.__init__(self)  self.start()  def run(self):  # 线程执行的代码  self.download_file()  def download_file(self):  # url = "http://example.cn/test.zip" 是网络上的zip压缩包文件  url = "http://example.cn/test.zip"  # 通过Session来下载,速度比直接requests.get快了大约百分之30  response = requests.Session().get(url, verify=False, stream=True)  total_length = int(response.headers.get("Content-Length"))  with open(os.path.abspath(os.getcwd() + "/resource/download/new.zip"), "wb") as code:  i = 0  temp = 0  # 用chunk_size的方法来下载,可以知道当前的下载进度。chunk_size影响每次写入的内存大小  for chunk in response.iter_content(chunk_size=512):  if chunk:  code.write(chunk)  code.flush()  i = i + 512  number = int(i)  # 因为进度条的长度设置成了100,所以这里要乘以100  message = number * 100 / total_length  wx.CallAfter(pub.sendMessage, "ota_topic", msg=message)  if temp != message:  temp = message  wx.CallAfter(pub.sendMessage, "download_text_topic", msg=message)  filepath = os.path.abspath(os.getcwd() + "/resource/download/new.zip")  # 直接放在程序根目录下了  foldpath = os.path.abspath(os.getcwd())  self.unzip(filepath, foldpath)  wx.CallAfter(pub.sendMessage, "close_download_topic", msg=1)  # 解压文件用的zipfile,将解压的文件,放置到指定路径下(覆盖复制)  def unzip(self, zip_file_path, unzip_to_path):  unzip_flag = False  try:  check_zip_flag = zipfile.is_zipfile(zip_file_path)  if check_zip_flag:  azip = zipfile.ZipFile(zip_file_path)  azip.extractall(path=unzip_to_path)  unzip_flag = True  except Exception as e:  print e.message  finally:  print "unzip_flag==========", unzip_flag  return unzip_flag # end of class MyApp if __name__ == "__main__":  app = MyApp(0)  app.MainLoop()

注:  把 url = "http://example.cn/test.zip" 修改为自己要下载的网络zip压缩包文件。然后在updateOTAevent.py中运行即可。zip文件会下载完成后解压至程序根目录

本公司销售:阿里云、腾讯云、百度云、天翼云、金山大米云、金山企业云盘!可签订合同,开具发票。

我有话说: