当前位置:   article > 正文

webhook-django框架_django webhook

django webhook

 测试接口的网站:

Webhook.site - Test, process and transform emails and HTTP requests

 接口测试的软件:

postman

部分代码,其余代码见资源处

  1. from datetime import datetime
  2. from http.client import OK
  3. import ipaddress, requests, datetime
  4. from xmlrpc.client import FastParser
  5. import pytz
  6. from django.http import HttpResponse
  7. from django.shortcuts import get_object_or_404, render, redirect
  8. from .models import MessageHistory, PSBServer
  9. from .forms import PSBServerForm
  10. from http import HTTPStatus
  11. from django.views.decorators.csrf import csrf_exempt
  12. from hook.models import PSBServer
  13. def save_history(device_subnet_ip, device_subnet_mask, full_path, response_code, error_text, redirect_rul, is_success):
  14. new_history=MessageHistory()
  15. new_history.device_subnet='{0}/{1}'.format(device_subnet_ip, device_subnet_mask)
  16. new_history.full_path=full_path
  17. new_history.response_code=response_code
  18. new_history.error_text=error_text
  19. new_history.history_time=datetime.datetime.now(tz=pytz.utc)
  20. new_history.redirect_url=redirect_rul
  21. new_history.is_success=is_success
  22. new_history.save()
  23. def get_ip_address(request):
  24. user_ip_address = request.META.get('HTTP_X_FORWARDED_FOR')
  25. if user_ip_address:
  26. ip = user_ip_address.split(',')[0]
  27. else:
  28. ip = request.META.get('REMOTE_ADDR')
  29. return ip
  30. @csrf_exempt
  31. def index(request):
  32. ip_addr=get_ip_address(request)
  33. print(request)
  34. full_path=request.get_full_path()[1:]
  35. # 查找该 IP 的子网 PSB 服务器
  36. psb_servers = PSBServer.objects.all()
  37. is_done = False
  38. # 获取请求中的location参数
  39. location = request.GET.get('location', '')
  40. for psb_server in psb_servers:
  41. # if ipaddress.ip_address(ip_addr) in ipaddress.ip_network('{0}/{1}'.format(psb_server.device_subnet_ip, psb_server.device_subnet_mask)):
  42. # 将信息重定向到相应的 PSB 服务器
  43. if location == psb_server.name:
  44. # 如果location等于服务器的name,将数据转发到服务器的ip_address和tcp_port
  45. target_url = f'http://{psb_server.ip_address}:{psb_server.tcp_port}/{full_path}'
  46. try:
  47. response = requests.get(target_url)
  48. # 这里你可以根据响应做进一步处理,例如记录历史等
  49. is_done = True
  50. return HttpResponse('Data forwarded successfully.')
  51. except Exception as e:
  52. is_done = False
  53. if not is_done:
  54. save_history('0.0.0.0', '0', full_path, 'NO_MATCH', 'there is no matched psb server', 'NO_MATCH',
  55. False)
  56. # 如果发送失败,可以记录错误或返回错误响应
  57. return HttpResponse('Data forwarding failed: ' + str(e), status=500)
  58. directory = ''
  59. return HttpResponse('OK,no location in your web ')
  60. def management(request):
  61. psb_servers=PSBServer.objects.all()
  62. content={
  63. 'psbs':psb_servers,
  64. }
  65. return render(request, 'hook/management.html', content)
  66. def history(request):
  67. history=MessageHistory.objects.all().order_by('-pk')[:100]
  68. content={
  69. 'histories':history,
  70. }
  71. return render(request, 'hook/history.html', content)
  72. def psb(request):
  73. if request.method=='POST':
  74. form=PSBServerForm(request.POST)
  75. if form.is_valid():
  76. new_psb=form.save(commit=False)
  77. new_psb.save()
  78. return redirect('hook:management')
  79. psbServerForm = PSBServerForm()
  80. content={
  81. 'form':psbServerForm,
  82. }
  83. return render(request, 'hook/psb.html', content)
  84. def delete_psb(request, psb_id):
  85. psb=get_object_or_404(PSBServer, pk=psb_id)
  86. psb.delete()
  87. return redirect('hook:management')

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/912031
推荐阅读
  

闽ICP备14008679号